Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Này Habr!

Bạn có thích lái máy bay không? Tôi thích nó, nhưng trong thời gian tự cách ly, tôi cũng thích phân tích dữ liệu về vé máy bay từ một nguồn tài nguyên nổi tiếng - Aviasale.

Hôm nay chúng ta sẽ phân tích công việc của Amazon Kinesis, xây dựng hệ thống phát trực tuyến với phân tích thời gian thực, cài đặt cơ sở dữ liệu Amazon DynamoDB NoSQL làm nơi lưu trữ dữ liệu chính và thiết lập thông báo SMS cho các yêu cầu thú vị.

Tất cả các chi tiết đều được cắt! Đi!

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Giới thiệu

Ví dụ: chúng ta cần truy cập vào API Aviasale. Quyền truy cập vào nó được cung cấp miễn phí và không có hạn chế; bạn chỉ cần đăng ký trong phần “Nhà phát triển” để nhận mã thông báo API của mình để truy cập dữ liệu.

Mục đích chính của bài viết này là cung cấp sự hiểu biết chung về việc sử dụng tính năng truyền phát thông tin trong AWS; chúng tôi lưu ý rằng dữ liệu được API sử dụng trả về không được cập nhật nghiêm ngặt và được truyền từ bộ đệm. được hình thành dựa trên tìm kiếm của người dùng trang Aviasale.ru và Jetradar.com trong 48 giờ qua.

Kinesis-agent, được cài đặt trên máy sản xuất, được nhận qua API sẽ tự động phân tích cú pháp và truyền dữ liệu đến luồng mong muốn thông qua Kinesis Data Analytics. Phiên bản thô của luồng này sẽ được ghi trực tiếp vào cửa hàng. Bộ lưu trữ dữ liệu thô được triển khai trong DynamoDB sẽ cho phép phân tích yêu cầu sâu hơn thông qua các công cụ BI, chẳng hạn như AWS Quick Sight.

Chúng tôi sẽ xem xét hai tùy chọn để triển khai toàn bộ cơ sở hạ tầng:

  • Hướng dẫn sử dụng - thông qua Bảng điều khiển quản lý AWS;
  • Cơ sở hạ tầng từ mã Terraform dành cho những người tự động hóa lười biếng;

Kiến trúc của hệ thống đã phát triển

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Các thành phần được sử dụng:

  • API Aviasale — dữ liệu được API này trả về sẽ được sử dụng cho tất cả công việc tiếp theo;
  • Phiên bản nhà sản xuất EC2 - một máy ảo thông thường trên đám mây nơi luồng dữ liệu đầu vào sẽ được tạo:
    • Đại lý Kinesis là một ứng dụng Java được cài đặt cục bộ trên máy, cung cấp một cách dễ dàng để thu thập và gửi dữ liệu tới Kinesis (Kinesis Data Streams hoặc Kinesis Firehose). Tác nhân liên tục giám sát một tập hợp tệp trong các thư mục được chỉ định và gửi dữ liệu mới đến Kinesis;
    • Tập lệnh gọi API — Một tập lệnh Python đưa ra các yêu cầu tới API và đưa phản hồi vào một thư mục được Kinesis Agent giám sát;
  • Luồng dữ liệu Kinesis — dịch vụ truyền dữ liệu theo thời gian thực với khả năng mở rộng quy mô;
  • Phân tích Kinesis là một dịch vụ không có máy chủ giúp đơn giản hóa việc phân tích dữ liệu truyền phát trong thời gian thực. Amazon Kinesis Data Analytics định cấu hình tài nguyên ứng dụng và tự động thay đổi quy mô để xử lý mọi khối lượng dữ liệu đến;
  • AWS Lambda — một dịch vụ cho phép bạn chạy mã mà không cần sao lưu hoặc thiết lập máy chủ. Tất cả sức mạnh tính toán được tự động điều chỉnh cho mỗi cuộc gọi;
  • Máy phát điện Amazon - Cơ sở dữ liệu gồm các cặp khóa-giá trị và tài liệu cung cấp độ trễ dưới 10 mili giây khi chạy ở mọi quy mô. Khi sử dụng DynamoDB, bạn không cần cung cấp, vá lỗi hoặc quản lý bất kỳ máy chủ nào. DynamoDB tự động chia tỷ lệ bảng để điều chỉnh lượng tài nguyên sẵn có và duy trì hiệu suất cao. Không cần quản trị hệ thống;
  • Amazon SNS - một dịch vụ được quản lý hoàn toàn để gửi tin nhắn bằng mô hình nhà xuất bản-người đăng ký (Pub/Sub), nhờ đó bạn có thể tách biệt các vi dịch vụ, hệ thống phân tán và ứng dụng không có máy chủ. SNS có thể được sử dụng để gửi thông tin đến người dùng cuối thông qua thông báo đẩy trên thiết bị di động, tin nhắn SMS và email.

Đào tạo cơ bản

Để mô phỏng luồng dữ liệu, tôi quyết định sử dụng thông tin vé máy bay được API Aviasale trả về. TRONG tài liệu Một danh sách khá phong phú gồm các phương pháp khác nhau, hãy lấy một trong số chúng - "Lịch giá hàng tháng", trả về giá cho mỗi ngày trong tháng, được nhóm theo số lần chuyển. Nếu bạn không chỉ định tháng tìm kiếm trong yêu cầu, thông tin sẽ được trả về cho tháng tiếp theo tháng hiện tại.

Vì vậy, hãy đăng ký và nhận mã thông báo của chúng tôi.

Một yêu cầu ví dụ dưới đây:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Phương pháp nhận dữ liệu từ API bằng cách chỉ định mã thông báo trong yêu cầu ở trên sẽ hoạt động, nhưng tôi thích chuyển mã thông báo truy cập qua tiêu đề hơn, vì vậy chúng tôi sẽ sử dụng phương thức này trong tập lệnh api_caller.py.

Ví dụ trả lời:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Phản hồi API mẫu ở trên hiển thị một vé từ St. Petersburg đến Phuk... Ôi, thật là một giấc mơ...
Vì tôi đến từ Kazan và Phuket giờ “chỉ là giấc mơ”, nên hãy tìm vé từ St. Petersburg đến Kazan.

Nó giả định rằng bạn đã có tài khoản AWS. Tôi muốn đặc biệt chú ý đến thực tế là Kinesis và gửi thông báo qua SMS không được bao gồm trong báo cáo hàng năm. Bậc miễn phí (sử dụng miễn phí). Nhưng ngay cả bất chấp điều này, với một vài đô la, bạn hoàn toàn có thể xây dựng hệ thống được đề xuất và thử nghiệm với nó. Và tất nhiên, đừng quên xóa tất cả các tài nguyên sau khi chúng không còn cần thiết nữa.

May mắn thay, các hàm DynamoDb và lambda sẽ miễn phí cho chúng tôi nếu chúng tôi đáp ứng giới hạn miễn phí hàng tháng của mình. Ví dụ: đối với DynamoDB: 25 GB dung lượng lưu trữ, 25 WCU/RCU và 100 triệu truy vấn. Và một triệu lệnh gọi hàm lambda mỗi tháng.

Triển khai hệ thống thủ công

Thiết lập luồng dữ liệu Kinesis

Hãy truy cập dịch vụ Kinesis Data Streams và tạo hai luồng mới, mỗi luồng một phân đoạn.

Một mảnh vỡ là gì?
Phân đoạn là đơn vị truyền dữ liệu cơ bản của luồng Amazon Kinesis. Một phân đoạn cung cấp khả năng truyền dữ liệu đầu vào với tốc độ 1 MB/s và truyền dữ liệu đầu ra với tốc độ 2 MB/s. Một phân đoạn hỗ trợ tới 1000 mục PUT mỗi giây. Khi tạo luồng dữ liệu, bạn cần chỉ định số lượng phân đoạn được yêu cầu. Ví dụ: bạn có thể tạo luồng dữ liệu có hai phân đoạn. Luồng dữ liệu này sẽ cung cấp khả năng truyền dữ liệu đầu vào với tốc độ 2 MB/s và truyền dữ liệu đầu ra với tốc độ 4 MB/s, hỗ trợ tới 2000 bản ghi PUT mỗi giây.

Càng nhiều phân đoạn trong luồng của bạn thì thông lượng của nó càng lớn. Về nguyên tắc, đây là cách quy mô các luồng - bằng cách thêm các phân đoạn. Nhưng bạn càng có nhiều mảnh thì giá càng cao. Mỗi phân đoạn có giá 1,5 xu mỗi giờ và thêm 1.4 xu cho mỗi triệu đơn vị tải trọng PUT.

Hãy tạo một luồng mới với tên vé máy bay, 1 mảnh vỡ sẽ là đủ cho anh ta:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Bây giờ hãy tạo một chủ đề khác với tên dòng_đặc biệt:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Thiết lập nhà sản xuất

Để phân tích một tác vụ, chỉ cần sử dụng phiên bản EC2 thông thường làm nhà sản xuất dữ liệu là đủ. Nó không cần phải là một máy ảo mạnh mẽ và đắt tiền, một t2.micro Spot sẽ hoạt động tốt.

Lưu ý quan trọng: ví dụ: bạn nên sử dụng hình ảnh - Amazon Linux AMI 2018.03.0, nó có ít cài đặt hơn để khởi chạy nhanh Kinesis Agent.

Đi tới dịch vụ EC2, tạo một máy ảo mới, chọn AMI mong muốn với loại t2.micro, được bao gồm trong Bậc miễn phí:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Để máy ảo mới được tạo có thể tương tác với dịch vụ Kinesis, nó phải được cấp quyền để làm như vậy. Cách tốt nhất để thực hiện việc này là chỉ định Vai trò IAM. Do đó, trên màn hình Bước 3: Định cấu hình chi tiết phiên bản, bạn nên chọn Tạo vai trò IAM mới:

Tạo vai trò IAM cho EC2
Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Trong cửa sổ mở ra, chọn rằng chúng tôi đang tạo vai trò mới cho EC2 và đi tới phần Quyền:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Sử dụng ví dụ đào tạo, chúng ta không cần phải đi sâu vào tất cả sự phức tạp của cấu hình chi tiết về quyền tài nguyên, vì vậy, chúng ta sẽ chọn các chính sách được Amazon định cấu hình trước: AmazonKinesisFullAccess và CloudWatchFullAccess.

Hãy đặt một số tên có ý nghĩa cho vai trò này, ví dụ: EC2-KinesisStreams-FullAccess. Kết quả sẽ giống như trong hình dưới đây:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Sau khi tạo vai trò mới này, đừng quên đính kèm nó vào phiên bản máy ảo đã tạo:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Chúng tôi không thay đổi bất cứ điều gì khác trên màn hình này và chuyển sang các cửa sổ tiếp theo.

Cài đặt ổ cứng có thể được để làm mặc định, cũng như các thẻ (mặc dù cách tốt nhất là sử dụng thẻ, nhưng ít nhất hãy đặt tên cho phiên bản và cho biết môi trường).

Bây giờ chúng ta đang ở tab Bước 6: Định cấu hình Nhóm bảo mật, nơi bạn cần tạo một nhóm mới hoặc chỉ định nhóm Bảo mật hiện có của mình, điều này cho phép bạn kết nối qua ssh (cổng 22) với phiên bản. Chọn Nguồn -> IP của tôi ở đó và bạn có thể khởi chạy phiên bản.

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Ngay khi nó chuyển sang trạng thái đang chạy, bạn có thể thử kết nối với nó qua ssh.

Để có thể làm việc với Kinesis Agent, sau khi kết nối với máy thành công, bạn phải nhập các lệnh sau vào terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Hãy tạo một thư mục để lưu phản hồi API:

sudo mkdir /var/log/airline_tickets

Trước khi khởi động tác nhân, bạn cần định cấu hình cấu hình của nó:

sudo vim /etc/aws-kinesis/agent.json

Nội dung của tệp Agent.json sẽ trông như thế này:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Như có thể thấy từ tệp cấu hình, tác nhân sẽ giám sát các tệp có phần mở rộng .log trong thư mục /var/log/airline_tickets/, phân tích cú pháp chúng và chuyển chúng sang luồng hãng hàng không_tickets.

Chúng tôi khởi động lại dịch vụ và đảm bảo rằng nó đã hoạt động:

sudo service aws-kinesis-agent restart

Bây giờ hãy tải xuống tập lệnh Python sẽ yêu cầu dữ liệu từ API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Tập lệnh api_caller.py yêu cầu dữ liệu từ Aviasale và lưu phản hồi nhận được vào thư mục mà tác nhân Kinesis quét. Việc triển khai tập lệnh này khá chuẩn, có lớp TicketsApi, nó cho phép bạn kéo API không đồng bộ. Chúng tôi chuyển tiêu đề có mã thông báo và tham số yêu cầu cho lớp này:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Để kiểm tra cài đặt và chức năng chính xác của tác nhân, hãy chạy thử tập lệnh api_caller.py:

sudo ./api_caller.py TOKEN

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Và chúng tôi xem xét kết quả công việc trong nhật ký Đại lý và trên tab Giám sát trong luồng dữ liệu hãng hàng không_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Như bạn có thể thấy, mọi thứ đều hoạt động và Kinesis Agent gửi dữ liệu tới luồng thành công. Bây giờ hãy cấu hình Consumer.

Thiết lập phân tích dữ liệu Kinesis

Hãy chuyển sang thành phần trung tâm của toàn bộ hệ thống - tạo một ứng dụng mới trong Kinesis Data Analytics có tên kinesis_analytics_airlines_app:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Kinesis Data Analytics cho phép bạn thực hiện phân tích dữ liệu theo thời gian thực từ Kinesis Streams bằng ngôn ngữ SQL. Đây là một dịch vụ tự động điều chỉnh quy mô hoàn toàn (không giống như Kinesis Streams) có chức năng:

  1. cho phép bạn tạo các luồng mới (Luồng đầu ra) dựa trên các yêu cầu đối với dữ liệu nguồn;
  2. cung cấp luồng có lỗi xảy ra khi ứng dụng đang chạy (Luồng lỗi);
  3. có thể tự động xác định sơ đồ dữ liệu đầu vào (có thể xác định lại thủ công nếu cần).

Đây không phải là một dịch vụ rẻ - 0.11 USD mỗi giờ làm việc, vì vậy bạn nên sử dụng cẩn thận và xóa nó khi hoàn thành.

Hãy kết nối ứng dụng với nguồn dữ liệu:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Chọn luồng chúng tôi sẽ kết nối tới (airline_tickets):

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Tiếp theo, bạn cần đính kèm Vai trò IAM mới để ứng dụng có thể đọc từ luồng và ghi vào luồng. Để thực hiện việc này, việc không thay đổi bất kỳ điều gì trong khối quyền Truy cập là đủ:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Bây giờ, hãy yêu cầu khám phá lược đồ dữ liệu trong luồng; để thực hiện việc này, hãy nhấp vào nút “Khám phá lược đồ”. Do đó, vai trò IAM sẽ được cập nhật (một vai trò mới sẽ được tạo) và tính năng phát hiện lược đồ sẽ được khởi chạy từ dữ liệu đã có trong luồng:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Bây giờ bạn cần vào trình soạn thảo SQL. Khi bạn nhấp vào nút này, một cửa sổ sẽ xuất hiện yêu cầu bạn khởi chạy ứng dụng - chọn những gì bạn muốn khởi chạy:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Chèn truy vấn đơn giản sau vào cửa sổ soạn thảo SQL và nhấp vào Lưu và chạy SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Trong cơ sở dữ liệu quan hệ, bạn làm việc với các bảng bằng cách sử dụng câu lệnh INSERT để thêm bản ghi và câu lệnh SELECT để truy vấn dữ liệu. Trong Amazon Kinesis Data Analytics, bạn làm việc với các luồng (STREAM) và máy bơm (PUMP)—các yêu cầu chèn liên tục chèn dữ liệu từ một luồng trong ứng dụng vào một luồng khác.

Truy vấn SQL trình bày các tìm kiếm ở trên để tìm vé Aeroflot với chi phí dưới 5 nghìn rúp. Tất cả các bản ghi đáp ứng các điều kiện này sẽ được đặt trong luồng DESTINATION_SQL_STREAM.

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Trong khối Đích, chọn luồng dòng đặc biệt và trong danh sách thả xuống Tên luồng trong ứng dụng DESTINATION_SQL_STREAM:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Kết quả của tất cả các thao tác sẽ giống như hình ảnh bên dưới:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Tạo và đăng ký một chủ đề SNS

Đi tới Dịch vụ thông báo đơn giản và tạo một chủ đề mới ở đó với tên Hãng hàng không:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Đăng ký chủ đề này và cho biết số điện thoại di động sẽ gửi thông báo SMS:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Tạo bảng trong DynamoDB

Để lưu trữ dữ liệu thô từ luồngair_tickets của họ, hãy tạo một bảng có cùng tên trong DynamoDB. Chúng tôi sẽ sử dụng record_id làm khóa chính:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Tạo bộ thu thập hàm lambda

Hãy tạo một hàm lambda có tên là Collector. Nhiệm vụ của hàm này là thăm dò luồng Airlines_tickets và nếu tìm thấy bản ghi mới ở đó, hãy chèn các bản ghi này vào bảng DynamoDB. Rõ ràng, ngoài các quyền mặc định, lambda này phải có quyền đọc vào luồng dữ liệu Kinesis và quyền ghi vào DynamoDB.

Tạo vai trò IAM cho hàm lambda bộ sưu tập
Trước tiên, hãy tạo vai trò IAM mới cho lambda có tên Lambda-TicketsProcessingRole:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Đối với ví dụ thử nghiệm, các chính sách AmazonKinesisReadOnlyAccess và AmazonDynamoDBFullAccess được cấu hình sẵn khá phù hợp, như minh họa trong hình bên dưới:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Lambda này phải được khởi chạy bằng trình kích hoạt từ Kinesis khi các mục mới nhập vào hãng hàng không_stream, vì vậy chúng ta cần thêm trình kích hoạt mới:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Tất cả những gì còn lại là chèn mã và lưu lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Tạo trình thông báo hàm lambda

Hàm lambda thứ hai, sẽ giám sát luồng thứ hai (special_stream) và gửi thông báo tới SNS, được tạo theo cách tương tự. Do đó, lambda này phải có quyền truy cập để đọc từ Kinesis và gửi tin nhắn đến một chủ đề SNS nhất định, sau đó sẽ được dịch vụ SNS gửi đến tất cả người đăng ký của chủ đề này (email, SMS, v.v.).

Tạo vai trò IAM
Đầu tiên, chúng tôi tạo vai trò IAM Lambda-KinesisAlarm cho lambda này, sau đó gán vai trò này cho lambda Alarm_notifier đang được tạo:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Lambda này sẽ hoạt động trên một trình kích hoạt để các bản ghi mới nhập vào Special_stream, vì vậy bạn cần định cấu hình trình kích hoạt theo cách tương tự như chúng tôi đã làm cho Collector lambda.

Để giúp định cấu hình lambda này dễ dàng hơn, hãy giới thiệu một biến môi trường mới - TOPIC_ARN, trong đó chúng tôi đặt ANR (Tên truy vấn Amazon) của chủ đề Hãng hàng không:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Và chèn mã lambda vào, nó không hề phức tạp chút nào:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Có vẻ như đây là lúc quá trình cấu hình hệ thống thủ công được hoàn tất. Tất cả những gì còn lại là kiểm tra và đảm bảo rằng chúng tôi đã cấu hình mọi thứ chính xác.

Triển khai từ mã Terraform

Sự chuẩn bị cần thiết

Terraform là một công cụ nguồn mở rất thuận tiện để triển khai cơ sở hạ tầng từ mã. Nó có cú pháp riêng, dễ học và có nhiều ví dụ về cách thức cũng như nội dung cần triển khai. Trình soạn thảo Atom hoặc Visual Studio Code có nhiều plugin tiện dụng giúp làm việc với Terraform dễ dàng hơn.

Bạn có thể tải xuống bản phân phối do đó. Phân tích chi tiết về tất cả các khả năng của Terraform nằm ngoài phạm vi của bài viết này, vì vậy chúng tôi sẽ giới hạn ở những điểm chính.

Làm thế nào để bắt đầu

Mã đầy đủ của dự án là trong kho lưu trữ của tôi. Chúng tôi sao chép kho lưu trữ cho chính mình. Trước khi bắt đầu, bạn cần đảm bảo rằng bạn đã cài đặt và định cấu hình AWS CLI, vì... Terraform sẽ tìm kiếm thông tin xác thực trong tệp ~/.aws/credentials.

Một cách tốt là chạy lệnh plan trước khi triển khai toàn bộ cơ sở hạ tầng để xem Terraform hiện đang tạo gì cho chúng ta trên đám mây:

terraform.exe plan

Bạn sẽ được nhắc nhập số điện thoại để gửi thông báo tới. Không cần thiết phải nhập nó ở giai đoạn này.

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Sau khi phân tích kế hoạch hoạt động của chương trình, chúng ta có thể bắt đầu tạo tài nguyên:

terraform.exe apply

Sau khi gửi lệnh này, bạn sẽ lại được yêu cầu nhập số điện thoại; quay số “có” khi câu hỏi về việc thực sự thực hiện các hành động được hiển thị. Điều này sẽ cho phép bạn thiết lập toàn bộ cơ sở hạ tầng, thực hiện tất cả cấu hình cần thiết của EC2, triển khai các hàm lambda, v.v.

Sau khi tất cả tài nguyên đã được tạo thành công thông qua mã Terraform, bạn cần đi sâu vào chi tiết của ứng dụng Kinesis Analytics (rất tiếc là tôi không tìm thấy cách thực hiện việc này trực tiếp từ mã).

Chạy chương trình:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Sau này, bạn phải đặt rõ ràng tên luồng trong ứng dụng bằng cách chọn từ danh sách thả xuống:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Bây giờ mọi thứ đã sẵn sàng để đi.

Kiểm tra ứng dụng

Bất kể bạn triển khai hệ thống theo cách nào, thủ công hay thông qua mã Terraform, nó sẽ hoạt động như nhau.

Chúng tôi đăng nhập qua SSH vào máy ảo EC2 nơi Kinesis Agent được cài đặt và chạy tập lệnh api_caller.py

sudo ./api_caller.py TOKEN

Tất cả những gì bạn phải làm là đợi một tin nhắn SMS đến số của bạn:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
SMS - tin nhắn sẽ đến điện thoại sau gần 1 phút:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless
Vẫn còn phải xem liệu các bản ghi có được lưu trong cơ sở dữ liệu DynamoDB để phân tích chi tiết hơn sau này hay không. Bảng hãng hàng không_tickets chứa khoảng dữ liệu sau:

Tích hợp API Aviasale với Amazon Kinesis và tính đơn giản của serverless

Kết luận

Trong quá trình thực hiện công việc, một hệ thống xử lý dữ liệu trực tuyến đã được xây dựng dựa trên Amazon Kinesis. Các tùy chọn sử dụng Kinesis Agent kết hợp với Kinesis Data Streams và phân tích thời gian thực Kinesis Analytics bằng lệnh SQL cũng như sự tương tác của Amazon Kinesis với các dịch vụ AWS khác đã được xem xét.

Chúng tôi đã triển khai hệ thống trên theo hai cách: một cách thủ công khá dài và một cách nhanh chóng từ mã Terraform.

Tất cả mã nguồn dự án đều có sẵn trong kho GitHub của tôi, Tôi khuyên bạn nên làm quen với nó.

Tôi rất vui được thảo luận về bài viết, tôi rất mong nhận được ý kiến ​​​​của bạn. Tôi hy vọng những lời chỉ trích mang tính xây dựng.

Chúc các bạn thành công!

Nguồn: www.habr.com

Thêm một lời nhận xét