تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

يا هبر!

هل تحب الطيران بالطائرات؟ أحب ذلك، ولكن أثناء العزلة الذاتية، أحببت أيضًا تحليل البيانات المتعلقة بتذاكر الطيران من أحد الموارد المعروفة - Aviasales.

سنقوم اليوم بتحليل عمل Amazon Kinesis، وإنشاء نظام دفق مع تحليلات في الوقت الفعلي، وتثبيت قاعدة بيانات Amazon DynamoDB NoSQL باعتبارها مخزن البيانات الرئيسي، وإعداد إشعارات الرسائل القصيرة للتذاكر المثيرة للاهتمام.

كل التفاصيل تحت الخفض! يذهب!

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

مقدمة

على سبيل المثال، نحن بحاجة إلى الوصول إلى واجهة برمجة تطبيقات Aviasales. يتم توفير الوصول إليه مجانًا وبدون قيود؛ ما عليك سوى التسجيل في قسم "المطورين" لتلقي رمز واجهة برمجة التطبيقات (API) الخاص بك للوصول إلى البيانات.

الغرض الرئيسي من هذه المقالة هو إعطاء فهم عام لاستخدام تدفق المعلومات في AWS؛ ونحن نأخذ في الاعتبار أن البيانات التي يتم إرجاعها بواسطة واجهة برمجة التطبيقات المستخدمة ليست محدثة بشكل صارم ويتم إرسالها من ذاكرة التخزين المؤقت، وهي تم تشكيلها بناءً على عمليات البحث التي أجراها مستخدمو موقعي Aviasales.ru وJetradar.com خلال الـ 48 ساعة الماضية.

سيقوم عامل Kinesis، المثبت على جهاز الإنتاج، والذي يتم استلامه عبر واجهة برمجة التطبيقات، بتحليل البيانات وإرسالها تلقائيًا إلى التدفق المطلوب عبر Kinesis Data Analytics. ستتم كتابة النسخة الأولية من هذا الدفق مباشرة إلى المتجر. سيسمح تخزين البيانات الأولية المنشور في DynamoDB بتحليل أعمق للتذاكر من خلال أدوات ذكاء الأعمال، مثل AWS Quick Sight.

سننظر في خيارين لنشر البنية التحتية بأكملها:

  • يدويًا - عبر وحدة الإدارة في AWS؛
  • البنية التحتية من كود Terraform مخصصة للأتمتة الكسولة؛

هندسة النظام المتطور

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
المكونات المستخدمة:

  • واجهة برمجة تطبيقات Aviasales - سيتم استخدام البيانات التي يتم إرجاعها بواسطة واجهة برمجة التطبيقات هذه لجميع الأعمال اللاحقة؛
  • مثيل منتج EC2 — جهاز افتراضي عادي في السحابة سيتم إنشاء دفق بيانات الإدخال عليه:
    • وكيل Kinesis هو تطبيق Java يتم تثبيته محليًا على الجهاز ويوفر طريقة سهلة لجمع البيانات وإرسالها إلى Kinesis (Kinesis Data Streams أو Kinesis Firehose). يراقب الوكيل باستمرار مجموعة من الملفات في الدلائل المحددة ويرسل بيانات جديدة إلى Kinesis؛
    • البرنامج النصي لواجهة برمجة التطبيقات للمتصل - برنامج نصي بلغة Python يقدم طلبات إلى واجهة برمجة التطبيقات (API) ويضع الاستجابة في مجلد تتم مراقبته بواسطة وكيل Kinesis؛
  • تيارات بيانات Kinesis - خدمة تدفق البيانات في الوقت الفعلي مع إمكانيات توسيع واسعة النطاق؛
  • تحليلات كينيسيس هي خدمة بدون خادم تعمل على تبسيط تحليل تدفق البيانات في الوقت الفعلي. يقوم Amazon Kinesis Data Analytics بتكوين موارد التطبيق والتكيف تلقائيًا للتعامل مع أي حجم من البيانات الواردة؛
  • AWS لامدا — خدمة تتيح لك تشغيل التعليمات البرمجية دون إجراء نسخ احتياطي أو إعداد خوادم. يتم ضبط كل قوة الحوسبة تلقائيًا لكل مكالمة؛
  • الأمازون DynamoDB - قاعدة بيانات لأزواج القيمة الرئيسية والمستندات التي توفر زمن وصول أقل من 10 مللي ثانية عند التشغيل على أي نطاق. عند استخدام DynamoDB، لا تحتاج إلى توفير أي خوادم أو تصحيحها أو إدارتها. يقوم DynamoDB تلقائيًا بقياس الجداول لضبط مقدار الموارد المتاحة والحفاظ على الأداء العالي. لا يلزم إدارة النظام؛
  • أمازون SNS - خدمة مُدارة بالكامل لإرسال الرسائل باستخدام نموذج الناشر والمشترك (Pub/Sub)، والذي يمكنك من خلاله عزل الخدمات الصغيرة والأنظمة الموزعة والتطبيقات التي لا تحتوي على خادم. يمكن استخدام SNS لإرسال المعلومات إلى المستخدمين النهائيين من خلال إشعارات الهاتف المحمول والرسائل النصية القصيرة ورسائل البريد الإلكتروني.

تدريب اولي

لمحاكاة تدفق البيانات، قررت استخدام معلومات تذكرة الطيران التي تم إرجاعها بواسطة Aviasales API. في توثيق قائمة واسعة جدًا من الطرق المختلفة، فلنأخذ إحداها - "تقويم الأسعار الشهرية"، الذي يعرض الأسعار لكل يوم من أيام الشهر، مجمعة حسب عدد التحويلات. إذا لم تحدد شهر البحث في الطلب، فسيتم إرجاع المعلومات للشهر التالي للشهر الحالي.

لذلك، دعونا نسجل ونحصل على رمزنا المميز.

مثال للطلب أدناه:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

الطريقة المذكورة أعلاه لتلقي البيانات من واجهة برمجة التطبيقات (API) عن طريق تحديد رمز مميز في الطلب ستعمل، لكنني أفضل تمرير رمز الوصول من خلال الرأس، لذلك سنستخدم هذه الطريقة في البرنامج النصي api_caller.py.

مثال الإجابة:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

يُظهر مثال استجابة واجهة برمجة التطبيقات (API) أعلاه تذكرة سفر من سانت بطرسبرغ إلى فوك... يا له من حلم...
نظرًا لأنني من قازان، وأصبحت فوكيت الآن "مجرد حلم"، فلنبحث عن التذاكر من سان بطرسبرج إلى قازان.

يفترض أن لديك بالفعل حساب AWS. أود أن ألفت انتباهًا خاصًا على الفور إلى حقيقة أن Kinesis وإرسال الإشعارات عبر الرسائل القصيرة غير مدرجين في الخدمة السنوية الطبقة المجانية (الاستخدام المجاني). ولكن على الرغم من ذلك، مع وضع بضعة دولارات في الاعتبار، فمن الممكن تمامًا بناء النظام المقترح واللعب به. وبالطبع، لا تنس حذف كافة الموارد بعد أن لم تعد هناك حاجة إليها.

لحسن الحظ، ستكون وظائف DynamoDb وlamda مجانية لنا إذا استوفينا حدودنا المجانية الشهرية. على سبيل المثال، بالنسبة إلى DynamoDB: مساحة تخزين تبلغ 25 جيجابايت و25 WCU/RCU و100 مليون استعلام. ومليون مكالمة دالة لامدا شهريًا.

النشر اليدوي للنظام

إعداد تدفقات بيانات Kinesis

دعنا ننتقل إلى خدمة Kinesis Data Streams ونقوم بإنشاء دفقين جديدين، جزء واحد لكل منهما.

ما هي القشرة؟
الجزء هو وحدة نقل البيانات الأساسية لتدفق Amazon Kinesis. يوفر أحد المقاطع نقل بيانات الإدخال بسرعة 1 ميجابايت/ثانية ونقل بيانات الإخراج بسرعة 2 ميجابايت/ثانية. يدعم مقطع واحد ما يصل إلى 1000 إدخال PUT في الثانية. عند إنشاء دفق بيانات، تحتاج إلى تحديد العدد المطلوب من الشرائح. على سبيل المثال، يمكنك إنشاء دفق بيانات يتكون من جزأين. سيوفر تدفق البيانات هذا نقل بيانات الإدخال بسرعة 2 ميجابايت/ثانية ونقل بيانات الإخراج بسرعة 4 ميجابايت/ثانية، مما يدعم ما يصل إلى 2000 سجل PUT في الثانية.

كلما زاد عدد القطع في الدفق الخاص بك، زادت إنتاجيته. من حيث المبدأ، هذه هي الطريقة التي يتم بها قياس التدفقات - عن طريق إضافة القطع. ولكن كلما زاد عدد القطع التي لديك، ارتفع السعر. تبلغ تكلفة كل جزء 1,5 سنتًا في الساعة و1.4 سنتًا إضافيًا لكل مليون وحدة حمولة PUT.

لنقم بإنشاء دفق جديد بالاسم تذاكر الطيران، 1 شظية ستكون كافية له:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
الآن لنقم بإنشاء موضوع آخر بالاسم Special_stream:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

إعداد المنتج

لتحليل مهمة ما، يكفي استخدام مثيل EC2 عادي كمنتج للبيانات. ليس من الضروري أن تكون آلة افتراضية قوية ومكلفة، فجهاز t2.micro الفوري سيكون جيدًا.

ملاحظة مهمة: على سبيل المثال، يجب عليك استخدام image - Amazon Linux AMI 2018.03.0، فهي تحتوي على إعدادات أقل لبدء تشغيل Kinesis Agent بسرعة.

انتقل إلى خدمة EC2، وقم بإنشاء جهاز افتراضي جديد، وحدد AMI المطلوب بالنوع t2.micro، والذي تم تضمينه في الطبقة المجانية:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
لكي يتمكن الجهاز الظاهري الذي تم إنشاؤه حديثًا من التفاعل مع خدمة Kinesis، يجب منحه حقوق القيام بذلك. أفضل طريقة للقيام بذلك هي تعيين دور IAM. ولذلك، في شاشة الخطوة 3: تكوين تفاصيل المثيل، يجب عليك تحديد إنشاء دور IAM جديد:

إنشاء دور IAM لـ EC2
تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
في النافذة التي تفتح، حدد أننا نقوم بإنشاء دور جديد لـ EC2 وانتقل إلى قسم الأذونات:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
باستخدام المثال التدريبي، لا يتعين علينا الخوض في كل تعقيدات التكوين الدقيق لحقوق الموارد، لذلك سنختار السياسات التي تم تكوينها مسبقًا بواسطة Amazon: AmazonKinesisFullAccess وCloudWatchFullAccess.

دعنا نعطي اسمًا ذا معنى لهذا الدور، على سبيل المثال: EC2-KinesisStreams-FullAccess. يجب أن تكون النتيجة هي نفسها كما هو موضح في الصورة أدناه:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
بعد إنشاء هذا الدور الجديد، لا تنس إرفاقه بمثيل الجهاز الظاهري الذي تم إنشاؤه:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
لا نغير أي شيء آخر على هذه الشاشة وننتقل إلى النوافذ التالية.

يمكن ترك إعدادات محرك الأقراص الثابتة كإعدادات افتراضية، بالإضافة إلى العلامات (على الرغم من أنه من الممارسات الجيدة استخدام العلامات، على الأقل قم بتسمية المثيل والإشارة إلى البيئة).

نحن الآن في الخطوة 6: تكوين علامة التبويب مجموعة الأمان، حيث تحتاج إلى إنشاء مجموعة جديدة أو تحديد مجموعة الأمان الموجودة لديك، والتي تسمح لك بالاتصال عبر ssh (المنفذ 22) بالمثيل. حدد المصدر -> عنوان IP الخاص بي هناك ويمكنك تشغيل المثيل.

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
بمجرد تحوله إلى حالة التشغيل، يمكنك محاولة الاتصال به عبر ssh.

لتتمكن من العمل مع Kinesis Agent، بعد الاتصال بالجهاز بنجاح، يجب عليك إدخال الأوامر التالية في الجهاز:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

لنقم بإنشاء مجلد لحفظ استجابات API:

sudo mkdir /var/log/airline_tickets

قبل بدء تشغيل الوكيل، تحتاج إلى تكوين التكوين الخاص به:

sudo vim /etc/aws-kinesis/agent.json

يجب أن تبدو محتويات ملف agent.json كما يلي:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

كما يتبين من ملف التكوين، سيقوم الوكيل بمراقبة الملفات ذات الامتداد .log في الدليل /var/log/airline_tickets/، وتحليلها ونقلها إلى تدفق Airlines_tickets.

نقوم بإعادة تشغيل الخدمة والتأكد من أنها قيد التشغيل:

sudo service aws-kinesis-agent restart

لنقم الآن بتنزيل نص Python الذي سيطلب البيانات من واجهة برمجة التطبيقات:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

يطلب البرنامج النصي api_caller.py البيانات من Aviasales ويحفظ الاستجابة المستلمة في الدليل الذي يقوم وكيل Kinesis بفحصه. يعد تنفيذ هذا البرنامج النصي قياسيًا تمامًا، وهناك فئة TicketsApi، وهي تتيح لك سحب واجهة برمجة التطبيقات بشكل غير متزامن. نقوم بتمرير رأس برمز مميز ونطلب المعلمات إلى هذه الفئة:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

لاختبار الإعدادات والوظائف الصحيحة للوكيل، فلنختبر تشغيل البرنامج النصي api_caller.py:

sudo ./api_caller.py TOKEN

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
ونحن ننظر إلى نتيجة العمل في سجلات الوكيل وفي علامة التبويب "المراقبة" في دفق بيانات تذكرة الطيران:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
كما ترون، كل شيء يعمل ويقوم وكيل Kinesis بإرسال البيانات بنجاح إلى الدفق. الآن دعونا نقوم بتكوين Consumer.

إعداد تحليلات بيانات Kinesis

دعنا ننتقل إلى المكون المركزي للنظام بأكمله - أنشئ تطبيقًا جديدًا في Kinesis Data Analytics باسم kinesis_analytics_airlines_app:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
يتيح لك Kinesis Data Analytics إجراء تحليلات البيانات في الوقت الفعلي من Kinesis Streams باستخدام لغة SQL. إنها خدمة تحجيم تلقائي بالكامل (على عكس Kinesis Streams) التي:

  1. يسمح لك بإنشاء تدفقات جديدة (دفق الإخراج) بناءً على طلبات البيانات المصدر؛
  2. يوفر دفقًا يحتوي على الأخطاء التي حدثت أثناء تشغيل التطبيقات (دفق الأخطاء)؛
  3. يمكنه تحديد مخطط بيانات الإدخال تلقائيًا (يمكن إعادة تعريفه يدويًا إذا لزم الأمر).

هذه ليست خدمة رخيصة - 0.11 دولارًا أمريكيًا لكل ساعة عمل، لذا يجب عليك استخدامها بعناية وحذفها عند الانتهاء.

لنقم بتوصيل التطبيق بمصدر البيانات:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
حدد البث الذي سنتصل به (airline_tickets):

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
بعد ذلك، تحتاج إلى إرفاق دور IAM جديد حتى يتمكن التطبيق من القراءة من الدفق والكتابة فيه. للقيام بذلك، يكفي عدم تغيير أي شيء في كتلة أذونات الوصول:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
لنطلب الآن اكتشاف مخطط البيانات في الدفق؛ للقيام بذلك، انقر فوق الزر "اكتشاف المخطط". ونتيجة لذلك، سيتم تحديث دور IAM (سيتم إنشاء دور جديد) وسيتم إطلاق اكتشاف المخطط من البيانات التي وصلت بالفعل إلى الدفق:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
أنت الآن بحاجة للذهاب إلى محرر SQL. عند النقر على هذا الزر، ستظهر نافذة تطلب منك تشغيل التطبيق - اختر ما تريد تشغيله:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
أدخل الاستعلام البسيط التالي في نافذة محرر SQL وانقر فوق Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

في قواعد البيانات العلائقية، يمكنك العمل مع الجداول باستخدام عبارات INSERT لإضافة سجلات وعبارة SELECT للاستعلام عن البيانات. في Amazon Kinesis Data Analytics، أنت تعمل مع التدفقات (STREAMs) والمضخات (PUMPs) - طلبات الإدراج المستمرة التي تدرج البيانات من تدفق واحد في تطبيق إلى تدفق آخر.

يقوم استعلام SQL الموضح أعلاه بالبحث عن تذاكر إيروفلوت بتكلفة أقل من خمسة آلاف روبل. سيتم وضع كافة السجلات التي تستوفي هذه الشروط في دفق DESTINATION_SQL_STREAM.

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
في كتلة الوجهة، حدد الدفق Special_stream، وفي القائمة المنسدلة اسم الدفق داخل التطبيق DESTINATION_SQL_STREAM:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
يجب أن تكون نتيجة جميع التلاعبات مشابهة للصورة أدناه:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

إنشاء والاشتراك في موضوع SNS

انتقل إلى خدمة الإشعارات البسيطة وقم بإنشاء موضوع جديد هناك باسم شركات الطيران:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
اشترك في هذا الموضوع وحدد رقم الهاتف المحمول الذي سيتم إرسال إشعارات الرسائل القصيرة إليه:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

إنشاء جدول في DynamoDB

لتخزين البيانات الأولية من تدفق تذاكر الطيران الخاصة بهم، فلنقم بإنشاء جدول في DynamoDB بنفس الاسم. سوف نستخدم Record_id كمفتاح أساسي:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

إنشاء جامع دالة لامدا

لنقم بإنشاء دالة lambda تسمى Collector، والتي ستكون مهمتها استطلاع تدفق تذاكر الطيران، وإذا تم العثور على سجلات جديدة هناك، فقم بإدراج هذه السجلات في جدول DynamoDB. من الواضح، بالإضافة إلى الحقوق الافتراضية، يجب أن يكون لدى lambda حق الوصول للقراءة إلى دفق بيانات Kinesis وحق الوصول للكتابة إلى DynamoDB.

إنشاء دور IAM لوظيفة Collector lambda
أولاً، لنقم بإنشاء دور IAM جديد لـ lambda المسمى Lambda-TicketsProcessingRole:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
بالنسبة لمثال الاختبار، تعد سياسات AmazonKinesisReadOnlyAccess وAmazonDynamoDBFullAccess التي تم تكوينها مسبقًا مناسبة تمامًا، كما هو موضح في الصورة أدناه:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

يجب إطلاق lambda بواسطة مشغل من Kinesis عندما تدخل إدخالات جديدة إلى خط الطيران، لذلك نحتاج إلى إضافة مشغل جديد:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
كل ما تبقى هو إدخال الكود وحفظ لامدا.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

إنشاء منبه دالة لامدا

يتم إنشاء وظيفة lambda الثانية بطريقة مماثلة، والتي ستراقب الدفق الثاني (special_stream) وترسل إشعارًا إلى SNS. لذلك، يجب أن يكون لدى lambda حق الوصول للقراءة من Kinesis وإرسال الرسائل إلى موضوع SNS معين، والذي سيتم بعد ذلك إرساله بواسطة خدمة SNS إلى جميع المشتركين في هذا الموضوع (البريد الإلكتروني، والرسائل النصية القصيرة، وما إلى ذلك).

إنشاء دور IAM
أولاً، نقوم بإنشاء دور IAM Lambda-KinesisAlarm لـ lambda، ثم نقوم بتعيين هذا الدور إلى Alarm_notifier lambda الذي يتم إنشاؤه:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

يجب أن يعمل lambda هذا على مشغل للسجلات الجديدة للدخول إلى Special_stream، لذلك تحتاج إلى تكوين المشغل بنفس الطريقة التي قمنا بها مع Collector lambda.

لتسهيل تكوين لامدا، دعنا نقدم متغير بيئة جديد - TOPIC_ARN، حيث نضع ANR (أسماء موارد أمازون) لموضوع الخطوط الجوية:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
وأدخل رمز لامدا، فهو ليس معقدًا على الإطلاق:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

يبدو أن هذا هو المكان الذي اكتمل فيه التكوين اليدوي للنظام. كل ما تبقى هو الاختبار والتأكد من أننا قمنا بتكوين كل شيء بشكل صحيح.

النشر من كود Terraform

التحضير اللازم

Terraform هي أداة مفتوحة المصدر مريحة للغاية لنشر البنية التحتية من التعليمات البرمجية. لديه بناء الجملة الخاص به الذي يسهل تعلمه ويحتوي على العديد من الأمثلة حول كيفية نشره وما يجب نشره. يحتوي محرر Atom أو Visual Studio Code على العديد من المكونات الإضافية المفيدة التي تجعل العمل مع Terraform أسهل.

يمكنك تحميل التوزيعة من هنا. إن التحليل التفصيلي لجميع إمكانيات Terraform هو خارج نطاق هذه المقالة، لذلك سنقتصر على النقاط الرئيسية.

كيفية الجري

الكود الكامل للمشروع هو في مستودع بلدي. نحن نستنسخ المستودع لأنفسنا. قبل البدء، تحتاج إلى التأكد من تثبيت AWS CLI وتكوينه، لأن... سيبحث Terraform عن بيانات الاعتماد في ملف ~/.aws/credentials.

من الممارسات الجيدة تشغيل أمر الخطة قبل نشر البنية التحتية بالكامل لمعرفة ما يقوم Terraform بإنشائه حاليًا لنا في السحابة:

terraform.exe plan

سيُطلب منك إدخال رقم هاتف لإرسال الإشعارات إليه. ليس من الضروري إدخاله في هذه المرحلة.

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
بعد تحليل خطة تشغيل البرنامج، يمكننا البدء في إنشاء الموارد:

terraform.exe apply

بعد إرسال هذا الأمر، سيُطلب منك مرة أخرى إدخال رقم هاتف؛ اطلب "نعم" عندما يظهر سؤال حول تنفيذ الإجراءات فعليًا. سيسمح لك هذا بإعداد البنية التحتية بالكامل، وتنفيذ جميع عمليات التكوين الضرورية لـ EC2، ونشر وظائف lambda، وما إلى ذلك.

بعد إنشاء جميع الموارد بنجاح من خلال كود Terraform، يتعين عليك الدخول في تفاصيل تطبيق Kinesis Analytics (للأسف، لم أجد كيفية القيام بذلك مباشرة من الكود).

بدء تطبيق:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
بعد ذلك، يجب عليك تعيين اسم الدفق داخل التطبيق بشكل صريح عن طريق التحديد من القائمة المنسدلة:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
الآن كل شيء جاهز للذهاب.

اختبار التطبيق

بغض النظر عن كيفية نشر النظام، يدويًا أو من خلال كود Terraform، فإنه سيعمل بنفس الطريقة.

نقوم بتسجيل الدخول عبر SSH إلى الجهاز الظاهري EC2 حيث تم تثبيت Kinesis Agent وتشغيل البرنامج النصي api_caller.py

sudo ./api_caller.py TOKEN

كل ما عليك فعله هو انتظار رسالة نصية على رقمك:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
الرسائل القصيرة - تصل الرسالة إلى هاتفك خلال دقيقة واحدة تقريبًا:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم
يبقى أن نرى ما إذا كان قد تم حفظ السجلات في قاعدة بيانات DynamoDB لإجراء تحليل لاحق أكثر تفصيلاً. يحتوي جدول Airlines_tickets على البيانات التالية تقريبًا:

تكامل Aviasales API مع Amazon Kinesis والبساطة بدون خادم

اختتام

في سياق العمل المنجز، تم إنشاء نظام معالجة البيانات عبر الإنترنت استنادًا إلى Amazon Kinesis. تم النظر في خيارات استخدام Kinesis Agent بالتزامن مع Kinesis Data Streams والتحليلات في الوقت الفعلي Kinesis Analytics باستخدام أوامر SQL، بالإضافة إلى تفاعل Amazon Kinesis مع خدمات AWS الأخرى.

لقد قمنا بنشر النظام المذكور أعلاه بطريقتين: طريقة يدوية طويلة إلى حد ما وطريقة سريعة من كود Terraform.

كل كود مصدر المشروع متاح في مستودع جيثب الخاص بي، أقترح عليك أن تتعرف عليه.

يسعدني مناقشة المقال، وأتطلع إلى تعليقاتكم. أتمنى النقد البناء .

وأتمنى لكم النجاح!

المصدر: www.habr.com

إضافة تعليق