Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

嘿哈布爾!

你喜歡開飛機嗎? 我喜歡它,但在自我隔離期間,我也愛上了分析來自一個知名資源 - Aviasales 的機票數據。

今天我們將分析 Amazon Kinesis 的工作,建立具有即時分析功能的串流系統,安裝 Amazon DynamoDB NoSQL 資料庫作為主要資料存儲,並為有興趣的工單設定簡訊通知。

所有的細節都在切割之下! 去!

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

介紹

例如,我們需要訪問 航空銷售 API。 存取它是免費且沒有限制的;您只需在「開發者」部分註冊即可接收您的 API 令牌來存取資料。

本文的主要目的是讓大家對AWS中資訊流的使用有一個大致的了解;我們考慮到所使用的API返回的資料並不是嚴格最新的,並且是從快取中傳輸的,這是根據過去48 小時內Aviasales.ru 和Jetradar.com 網站的使用者搜尋而形成。

Kinesis-agent 安裝在生產機器上,透過 API 接收,將透過 Kinesis Data Analytics 自動解析資料並將資料傳輸到所需的流。 該流的原始版本將直接寫入儲存。 DynamoDB 中部署的原始資料儲存將允許透過 BI 工具(例如 AWS Quick Sight)進行更深入的票證分析。

我們將考慮部署整個基礎架構的兩種選擇:

  • 手動 - 透過 AWS 管理主控台;
  • Terraform 程式碼的基礎架構適用於懶惰的自動化人員;

開發系統的架構

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
使用的組件:

  • 航空銷售 API ——該API傳回的資料將用於後續所有工作;
  • EC2 生產者實例 — 雲端中的常規虛擬機,將在其中產生輸入資料流:
    • 運動劑 是一個本地安裝在電腦上的 Java 應用程序,它提供了一種收集資料並將其發送到 Kinesis(Kinesis Data Streams 或 Kinesis Firehose)的簡單方法。 代理程式不斷監控指定目錄中的一組檔案並將新資料傳送到 Kinesis;
    • API呼叫者腳本 — 一個 Python 腳本,向 API 發出請求並將回應放入由 Kinesis Agent 監控的資料夾中;
  • Kinesis 數據流 — 具有廣泛擴展能力的即時資料流服務;
  • 運動分析 是一種無伺服器服務,可簡化即時串流資料分析。 Amazon Kinesis Data Analytics 配置應用程式資源並自動擴展以處理任意數量的傳入資料;
  • AWS Lambda — 一項可讓您執行程式碼而無需備份或設定伺服器的服務。 所有運算能力都會針對每次呼叫自動縮放;
  • 亞馬遜DynamoDB - 鍵值對和文件的資料庫,在任何規模運行時延遲均小於 10 毫秒。 使用 DynamoDB 時,您無需預置、修補或管理任何伺服器。 DynamoDB 自動擴充表以調整可用資源量並保持高效能。 無需系統管理;
  • 亞馬遜SNS - 使用發布者-訂閱者 (Pub/Sub) 模型發送訊息的完全託管服務,您可以使用該模型隔離微服務、分散式系統和無伺服器應用程式。 SNS 可用於透過行動推播通知、簡訊和電子郵件向最終用戶發送訊息。

初步培訓

為了模擬資料流,我決定使用 Aviasales API 傳回的機票資訊。 在 文件 不同方法的清單相當廣泛,讓我們選擇其中一種 - “每月價格日曆”,它返回每月每一天的價格,並按轉帳數量分組。 如果您未在請求中指定搜尋月份,則會傳回目前月份的下一個月的資訊。

那麼,讓我們註冊並獲取我們的令牌。

請求示例如下:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

上述透過在請求中指定令牌來從 API 接收資料的方法是可行的,但我更喜歡透過標頭傳遞存取令牌,因此我們將在 api_caller.py 腳本中使用此方法。

回答範例:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

上面的範例 API 回應顯示了從聖彼得堡到 Phuk 的機票...哦,多棒的夢想...
由於我來自喀山,而普吉島現在“只是一個夢想”,讓我們尋找從聖彼得堡到喀山的機票。

它假設您已經擁有一個 AWS 帳戶。 我想立即特別注意以下事實:Kinesis 和透過 SMS 發送通知不包含在年度報告中 免費套餐(免費使用)。 但即便如此,只要花幾美元,就很有可能建立提議的系統並使用它。 當然,不要忘記刪除不再需要的所有資源。

幸運的是,如果我們達到每月的免費限制,DynamoDb 和 lambda 函數將對我們免費。 例如,對於 DynamoDB:25 GB 儲存、25 WCU/RCU 和 100 億個查詢。 每月有一百萬次 lambda 函數呼叫。

手動系統部署

設定 Kinesis 資料流

讓我們轉到 Kinesis Data Streams 服務並建立兩個新流,每個流一個分片。

什麼是分片?
分片是 Amazon Kinesis 流的基本資料傳輸單元。 一個段以 1 MB/s 的速度提供輸入資料傳輸,以 2 MB/s 的速度提供輸出資料傳輸。 一個段落每秒最多支援 1000 個 PUT 條目。 建立資料流時,需要指定所需的段數。 例如,您可以建立包含兩個段的資料流。 此資料流將以 2 MB/s 的速度提供輸入資料傳輸,以 4 MB/s 的速度提供輸出資料傳輸,支援每秒多達 2000 個 PUT 記錄。

流中的分片越多,其吞吐量就越大。 原則上,這就是透過添加分片來擴展流量的方式。 但擁有的碎片越多,價格就越高。 每個分片每小時花費 1,5 美分,每百萬 PUT 有效負載單位額外花費 1.4 美分。

讓我們建立一個名為的新串流 機票,1 個碎片對他來說就足夠了:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
現在讓我們建立另一個線程,名稱為 特殊串流:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

製作人設定

要分析任務,使用常規 EC2 執行個體作為資料生產者就足夠了。 它不一定是功能強大、昂貴的虛擬機器;t2.micro 就可以了。

重要提示:例如,您應該使用鏡像 - Amazon Linux AMI 2018.03.0,它用於快速啟動 Kinesis Agent 的設定較少。

前往 EC2 服務,建立一個新虛擬機,選擇類型為 t2.micro 的所需 AMI,該 AMI 包含在免費套餐中:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
為了使新建立的虛擬機器能夠與 Kinesis 服務交互,必須為其授予執行此操作的權限。 執行此操作的最佳方法是指派 IAM 角色。 因此,在步驟 3:設定實例詳細資料畫面上,您應該選擇 建立新的 IAM 角色:

為 EC2 建立 IAM 角色
Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
在開啟的視窗中,選擇我們正在為 EC2 建立新角色,然後轉到權限部分:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
使用訓練範例,我們不必深入了解資源權限的精細配置的所有複雜性,因此我們將選擇 Amazon 預先配置的策略:AmazonKinesisFullAccess 和 CloudWatchFullAccess。

讓我們為此角色指定一些有意義的名稱,例如:EC2-KinesisStreams-FullAccess。 結果應該與下圖所示相同:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
創建這個新角色後,不要忘記將其附加到創建的虛擬機器實例:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
我們不更改此畫面上的任何其他內容,然後轉到下一個視窗。

硬碟設定以及標籤可以保留為預設值(儘管使用標籤是一種很好的做法,但至少為實例指定一個名稱並指示環境)。

現在我們進入第 6 步:設定安全群組標籤,您需要在其中建立一個新安全性群組或指定現有安全性群組,這樣您就可以透過 ssh(連接埠 22)連線到執行個體。 在那裡選擇“來源”->“我的 IP”,您可以啟動該執行個體。

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
一旦它切換到運行狀態,您就可以嘗試透過 ssh 連線到它。

為了能夠使用 Kinesis Agent,成功連接到電腦後,您必須在終端機中輸入以下命令:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

讓我們建立一個資料夾來保存 API 回應:

sudo mkdir /var/log/airline_tickets

在啟動代理之前,您需要配置其配置:

sudo vim /etc/aws-kinesis/agent.json

agent.json 檔案的內容應如下所示:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

從設定檔中可以看出,代理程式將監視/var/log/airline_tickets/目錄中擴展名為.log的文件,解析它們並將它們傳輸到airline_tickets流。

我們重新啟動服務並確保它已啟動並運行:

sudo service aws-kinesis-agent restart

現在讓我們下載將從 API 請求資料的 Python 腳本:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py 腳本從 Aviasales 請求數據,並將收到的回應保存在 Kinesis 代理掃描的目錄中。 這個腳本的實作非常標準,有一個 TicketsApi 類,它允許您非同步拉取 API。 我們將帶有令牌的標頭和請求參數傳遞給此類:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

為了測試代理程式的正確設定和功能,讓我們測試執行 api_caller.py 腳本:

sudo ./api_caller.py TOKEN

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
我們在 Agent 日誌和 Airlines_tickets 資料流的「Monitoring」標籤中查看工作結果:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
如您所見,一切正常,Kinesis Agent 成功將資料傳送到串流。 現在讓我們來配置消費者。

設定 Kinesis Data Analytics

讓我們繼續討論整個系統的核心元件 - 在 Kinesis Data Analytics 中建立一個名為 kinesis_analytics_airlines_app 的新應用程式:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
Kinesis Data Analytics 可讓您使用 SQL 語言從 Kinesis Streams 執行即時數據分析。 它是一項完全自動擴充的服務(與 Kinesis Streams 不同):

  1. 允許您根據對來源資料的請求建立新的流(輸出流);
  2. 提供應用程式運行時發生的錯誤的流(錯誤流);
  3. 可自動確定輸入資料方案(必要時可手動重新定義)。

這不是一項便宜的服務 - 每小時 0.11 美元,因此您應該謹慎使用它,並在完成後將其刪除。

讓我們將應用程式連接到資料來源:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
選擇我們要連接的流(airline_tickets):

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
接下來,您需要附加一個新的 IAM 角色,以便應用程式可以從流中讀取資料並向流寫入資料。 為此,無需更改存取權限區塊中的任何內容就足夠了:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
現在讓我們請求發現流中的資料模式;為此,請點擊「發現模式」按鈕。 因此,IAM 角色將被更新(將創建一個新角色),並且將從已到達流中的資料啟動模式檢測:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
現在您需要轉到 SQL 編輯器。 當您單擊此按鈕時,將出現一個窗口,要求您啟動應用程式 - 選擇您要啟動的內容:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
將以下簡單查詢插入 SQL 編輯器窗口,然後按一下「儲存並執行 SQL」:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

在關聯式資料庫中,您可以使用 INSERT 語句來新增記錄並使用 SELECT 語句來查詢資料。 在 Amazon Kinesis Data Analytics 中,您可以使用流 (STREAM) 和幫浦 (PUMP),即連續插入請求,將應用程式中的一個流中的資料插入到另一個流中。

上面提供的 SQL 查詢搜尋價格低於 XNUMX 盧布的 Aeroflot 機票。 所有符合這些條件的記錄都將放置在 DESTINATION_SQL_STREAM 流中。

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
在 Destination 區塊中,選擇special_stream 流,然後在應用程式內流名稱 DESTINATION_SQL_STREAM 下拉清單中:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
所有操作的結果應該類似下圖:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

建立並訂閱 SNS 主題

前往簡單通知服務並在那裡建立一個名為 Airlines 的新主題:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
訂閱此主題並指定將向其發送簡訊通知的手機號碼:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

在 DynamoDB 中建立表

為了儲存 Airlines_tickets 流中的原始數據,我們在 DynamoDB 中建立一個具有相同名稱的表。 我們將使用 record_id 作為主鍵:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

建立 lambda 函數收集器

讓我們建立一個名為 Collector 的 lambda 函數,其任務是輪詢airline_tickets 流,如果在那裡找到新記錄,則將這些記錄插入 DynamoDB 表中。 顯然,除了預設權限之外,此 lambda 還必須具有對 Kinesis 資料流的讀取存取權和對 DynamoDB 的寫入存取權。

為收集器 lambda 函數建立 IAM 角色
首先,我們為 lambda 建立一個名為 Lambda-TicketsProcessingRole 的新 IAM 角色:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
對於測試範例,預先設定的 AmazonKinesisReadOnlyAccess 和 AmazonDynamoDBFullAccess 策略非常合適,如下圖所示:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

當新條目進入airline_stream 時,該 lambda 應該由 Kinesis 的觸發器啟動,因此我們需要新增一個觸發器:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
剩下的就是插入程式碼並保存 lambda。

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

建立 lambda 函數通知程序

第二個 lambda 函數將監視第二個流 (special_stream) 並向 SNS 發送通知,以類似的方式建立。 因此,該 lambda 必須有權從 Kinesis 讀取資料並將訊息發送到給定的 SNS 主題,然後 SNS 服務將這些訊息發送給該主題的所有訂閱者(電子郵件、簡訊等)。

建立 IAM 角色
首先,我們為此 lambda 建立 IAM 角色 Lambda-KinesisAlarm,然後將此角色指派給正在建立的 Alarm_notifier lambda:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

此 lambda 應該會作用於新記錄進入special_stream 的觸發器,因此您需要以與 Collector lambda 相同的方式配置觸發器。

為了更輕鬆地配置此 lambda,我們引入一個新的環境變數 - TOPIC_ARN,其中放置 Airlines 主題的 ANR(Amazon 資源名稱):

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
然後插入 lambda 程式碼,一點也不複雜:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

看來手動系統配置到此就完成了。 剩下的就是測試並確保我們已正確配置所有內容。

從 Terraform 程式碼部署

所需準備

Terraform 是一個非常方便的開源工具,用於從程式碼部署基礎架構。 它有自己的語法,易於學習,並且有許多關於如何部署和部署什麼的範例。 Atom 編輯器或 Visual Studio Code 具有許多方便的插件,讓使用 Terraform 變得更加容易。

您可以下載發行版 。 對所有 Terraform 功能的詳細分析超出了本文的範圍,因此我們將僅限於要點。

如何開始

項目完整代碼為 在我的儲存庫中。 我們將存儲庫克隆到我們自己。 在開始之前,您需要確保已安裝並配置 AWS CLI,因為... Terraform 會在 ~/.aws/credentials 檔案中尋找憑證。

一個好的做法是在部署整個基礎架構之前執行 plan 命令,以查看 Terraform 目前在雲端中為我們創建的內容:

terraform.exe plan

系統將提示您輸入用於發送通知的電話號碼。 此階段無需進入。

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
分析完程式的運行計劃,我們就可以開始建立資源了:

terraform.exe apply

發送此命令後,系統將再次要求您輸入電話號碼;當顯示有關實際執行操作的問題時,請撥打「是」。 這將允許您設定整個基礎架構、執行 EC2 的所有必要配置、部署 lambda 函數等。

透過 Terraform 程式碼成功建立所有資源後,您需要深入了解 Kinesis Analytics 應用程式的詳細資訊(不幸的是,我沒有找到如何直接從程式碼中執行此操作)。

啟動應用程式:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
此後,您必須透過從下拉清單中選擇來明確設定應用程式內流名稱:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
現在一切準備就緒。

測試應用程式

無論您如何部署系統,手動還是透過 Terraform 程式碼,它的工作原理都是一樣的。

我們透過 SSH 登入安裝了 Kinesis Agent 的 EC2 虛擬機器並執行 api_caller.py 腳本

sudo ./api_caller.py TOKEN

您所要做的就是等待簡訊發送到您的號碼:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
SMS - 一則訊息在大約 1 分鐘內到達手機:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性
仍有待觀察這些記錄是否保存在 DynamoDB 資料庫中,以便進行後續更詳細的分析。 Airlines_tickets 表大約包含以下資料:

Aviasales API 與 Amazon Kinesis 整合並實現無伺服器簡單性

結論

在工作過程中,基於Amazon Kinesis建立了線上資料處理系統。 考慮了將 Kinesis Agent 與 Kinesis Data Streams 和使用 SQL 命令的即時分析 Kinesis Analytics 結合使用的選項,以及 Amazon Kinesis 與其他 AWS 服務的交互作用。

我們透過兩種方式部署上述系統:一種是相當長的手動方式,另一種是透過 Terraform 程式碼快速部署。

所有項目原始碼均可用 在我的 GitHub 儲存庫中,我建議你熟悉一下。

我很高興討論這篇文章,期待您的評論。 我希望得到建設性的批評。

祝你成功!

來源: www.habr.com

添加評論