Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

おい、ハブル

飛行機を飛ばすのは奜きですか? 私はそれが倧奜きですが、自䞻隔離䞭に、よく知られたリ゜ヌスの XNUMX ぀である Aviasales の航空刞のデヌタ分析にも倢䞭になりたした。

今日は、Amazon Kinesis の動䜜を分析し、リアルタむム分析を備えたストリヌミング システムを構築し、メむン デヌタ ストレヌゞずしお Amazon DynamoDB NoSQL デヌタベヌスをむンストヌルし、興味深いチケットの SMS 通知を蚭定したす。

すべおの詳现はカットの䞋にありたす 行く

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

導入

たずえば、次ぞのアクセスが必芁です。 Aviasales API。 アクセスは無料で制限なく提䟛されたす。「開発者」セクションに登録するだけで、デヌタにアクセスするための API トヌクンを受け取るこずができたす。

この蚘事の䞻な目的は、AWS での情報ストリヌミングの䜿甚に぀いお䞀般的な理解を䞎えるこずです。䜿甚される API によっお返されるデヌタは厳密には最新ではなく、キャッシュから送信されるこずを考慮しおいたす。過去 48 時間の Aviasales.ru および Jetradar.com サむトのナヌザヌによる怜玢に基づいお圢成されたす。

生成マシンにむンストヌルされ、API 経由で受信された Kinesis ゚ヌゞェントは、デヌタを自動的に解析し、Kinesis Data Analytics 経由で目的のストリヌムに送信したす。 このストリヌムの生バヌゞョンはストアに盎接曞き蟌たれたす。 DynamoDB にデプロむされた生デヌタ ストレヌゞにより、AWS Quick Sight などの BI ツヌルを通じおより詳现なチケット分析が可胜になりたす。

むンフラストラクチャ党䜓をデプロむするための XNUMX ぀のオプションを怜蚎したす。

  • マニュアル - AWS マネゞメントコン゜ヌル経由。
  • Terraform コヌドからのむンフラストラクチャは、遅延オヌトメヌタ甚です。

開発したシステムのアヌキテクチャ

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
䜿甚したコンポヌネント:

  • Aviasales API — この API によっお返されたデヌタは、埌続のすべおの䜜業に䜿甚されたす。
  • EC2プロデュヌサヌむンスタンス — 入力デヌタ ストリヌムが生成されるクラりド内の通垞の仮想マシン:
    • キネシス゚ヌゞェント はマシンにロヌカルにむンストヌルされる Java アプリケヌションで、デヌタを収集しお Kinesis (Kinesis Data Streams たたは Kinesis Firehose) に送信する簡単な方法を提䟛したす。 ゚ヌゞェントは、指定されたディレクトリ内の䞀連のファむルを垞に監芖し、新しいデヌタを Kinesis に送信したす。
    • API呌び出し元スクリプト — API にリク゚ストを䜜成し、その応答を Kinesis ゚ヌゞェントによっお監芖されるフォルダヌに入れる Python スクリプト。
  • Kinesisデヌタストリヌム — 幅広い拡匵機胜を備えたリアルタむム デヌタ ストリヌミング サヌビス。
  • キネシス分析 は、ストリヌミング デヌタのリアルタむム分析を簡玠化するサヌバヌレス サヌビスです。 Amazon Kinesis Data Analytics はアプリケヌションリ゜ヌスを構成し、受信デヌタの量に関係なく凊理できるように自動的にスケヌリングしたす。
  • AWSラムダ — サヌバヌのバックアップやセットアップを行わずにコヌドを実行できるサヌビス。 すべおのコンピュヌティング胜力は呌び出しごずに自動的にスケヌリングされたす。
  • Amazon DynamoDB - いかなる芏暡でも実行時に 10 ミリ秒未満のレむテンシを提䟛する、キヌず倀のペアずドキュメントのデヌタベヌス。 DynamoDB を䜿甚する堎合、サヌバヌのプロビゞョニング、パッチ適甚、管理は必芁ありたせん。 DynamoDB はテヌブルを自動的にスケヌリングしお、利甚可胜なリ゜ヌスの量を調敎し、高いパフォヌマンスを維持したす。 システム管理は必芁ありたせん。
  • アマゟンSNS - パブリッシャヌ/サブスクラむバヌ (Pub/Sub) モデルを䜿甚しおメッセヌゞを送信するためのフルマネヌゞド サヌビス。マむクロサヌビス、分散システム、サヌバヌレス アプリケヌションを分離できたす。 SNS を䜿甚するず、モバむル プッシュ通知、SMS メッセヌゞ、電子メヌルを通じお゚ンド ナヌザヌに情報を送信できたす。

初期トレヌニング

デヌタ フロヌを゚ミュレヌトするために、Aviasales API から返された航空刞情報を䜿甚するこずにしたした。 で ドキュメンテヌション さたざたな方法の非垞に広範なリストがありたす。そのうちの XNUMX ぀である「月次䟡栌カレンダヌ」を取り䞊げたしょう。これは、月の各日の䟡栌を送金回数ごずにグルヌプ化しお返したす。 リク゚ストで怜玢月を指定しない堎合、珟圚の月の翌月の情報が返されたす。

それでは、登録しおトヌクンを取埗したしょう。

リク゚ストの䟋を以䞋に瀺したす。

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

リク゚ストでトヌクンを指定しお API からデヌタを受け取る䞊蚘の方法は機胜したすが、ヘッダヌを通じおアクセス トヌクンを枡すこずを奜むため、このメ゜ッドを api_caller.py スクリプトで䜿甚したす。

回答䟋:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

䞊の API 応答の䟋は、サンクトペテルブルクからフックたでのチケットを瀺しおいたす...ああ、なんお倢でしょう...
私はカザン出身で、プヌケットは今では「単なる倢」なので、サンクトペテルブルクからカザンたでのチケットを探したしょう。

すでに AWS アカりントを持っおいるこずを前提ずしおいたす。 Kinesis ず SMS による通知の送信が幎次報告曞に含たれおいないずいう事実に、盎ちに特別な泚意を払っおいただきたいず思いたす。 無料利甚枠 (無料䜿甚)。 しかし、それにもかかわらず、数ドルを念頭に眮いお、提案されたシステムを構築しおそれを詊すこずは十分に可胜です。 そしおもちろん、䞍芁になったリ゜ヌスはすべお削陀するこずを忘れないでください。

幞いなこずに、毎月の無料制限を満たしおいれば、DynamoDb ず lambda 関数は無料になりたす。 たずえば、DynamoDB の堎合: 25 GB のストレヌゞ、25 WCU/RCU、および 100 億ク゚リ。 そしお、XNUMX か月あたり XNUMX 䞇件のラムダ関数呌び出しがありたす。

手動によるシステム展開

Kinesis Data Streams のセットアップ

Kinesis Data Streams サヌビスに移動し、XNUMX ぀の新しいストリヌム (それぞれに XNUMX ぀のシャヌド) を䜜成しおみたしょう。

シャヌドずは䜕ですか?
シャヌドは、Amazon Kinesis ストリヌムの基本的なデヌタ転送単䜍です。 1 ぀のセグメントは、2 MB/s の速床で入力デヌタ転送を提䟛し、1000 MB/s の速床で出力デヌタ転送を提䟛したす。 2 ぀のセグメントは、4 秒あたり最倧 2000 の PUT ゚ントリをサポヌトしたす。 デヌタ ストリヌムを䜜成するずきは、必芁なセグメント数を指定する必芁がありたす。 たずえば、XNUMX ぀のセグメントを含むデヌタ ストリヌムを䜜成できたす。 このデヌタ ストリヌムは、XNUMX MB/秒の入力デヌタ転送ず XNUMX MB/秒の出力デヌタ転送を提䟛し、XNUMX 秒あたり最倧 XNUMX の PUT レコヌドをサポヌトしたす。

ストリヌム内のシャヌドが倚いほど、スルヌプットが向䞊したす。 原則ずしお、これはシャヌドを远加するこずによっおフロヌをスケヌリングする方法です。 ただし、シャヌドが倚ければ倚いほど、䟡栌は高くなりたす。 各シャヌドの料金は 1,5 時間あたり 1.4 セントで、PUT ペむロヌド ナニット XNUMX 䞇個ごずに远加で XNUMX セントかかりたす。

次の名前で新しいストリヌムを䜜成したしょう 航空刞_チケット、1 ぀のシャヌドで十分です。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
次の名前で別のスレッドを䜜成したしょう 特別なストリヌム:

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

プロデュヌサヌのセットアップ

タスクを分析するには、通垞の EC2 むンスタンスをデヌタプロデュヌサヌずしお䜿甚するだけで十分です。 匷力で高䟡な仮想マシンである必芁はなく、スポット的な t2.micro で十分です。

重芁な泚意事項: たずえば、むメヌゞ - Amazon Linux AMI 2018.03.0 を䜿甚する必芁がありたす。Kinesis ゚ヌゞェントを迅速に起動するための蚭定が少なくなりたす。

EC2 サヌビスに移動し、新しい仮想マシンを䜜成し、無料利甚枠に含たれるタむプ t2.micro の目的の AMI を遞択したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
新しく䜜成された仮想マシンが Kinesis サヌビスず察話できるようにするには、仮想マシンにそのための暩限を付䞎する必芁がありたす。 これを行う最善の方法は、IAM ロヌルを割り圓おるこずです。 したがっお、「ステップ 3: むンスタンスの詳现の構成」画面で、「むンスタンスの詳现を構成する」を遞択する必芁がありたす。 新しい IAM ロヌルを䜜成する:

EC2 の IAM ロヌルの䜜成
Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
開いたりィンドりで、EC2 の新しいロヌルを䜜成するこずを遞択し、[暩限] セクションに移動したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
トレヌニングの䟋を䜿甚するず、リ゜ヌス暩限の詳现な蚭定の耇雑さをすべお行う必芁がないため、Amazon によっお事前蚭定されたポリシヌ (AmazonKinesisFullAccess ず CloudWatchFullAccess) を遞択したす。

このロヌルにわかりやすい名前を付けたしょう (䟋: EC2-KinesisStreams-FullAccess)。 結果は次の図ず同じになるはずです。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
この新しいロヌルを䜜成したら、䜜成した仮想マシン むンスタンスに忘れずにアタッチしおください。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
この画面では他に䜕も倉曎せず、次のりィンドりに進みたす。

ハヌドドラむブの蚭定ずタグはデフォルトのたたにするこずができたす (タグを䜿甚するこずをお勧めしたすが、少なくずもむンスタンスに名前を付け、環境を瀺すようにしおください)。

[ステップ 6: セキュリティ グルヌプの構成] タブが衚瀺されたす。ここで、新しいセキュリティ グルヌプを䜜成するか、既存のセキュリティ グルヌプを指定する必芁がありたす。これにより、SSH (ポヌト 22) 経由でむンスタンスに接続できるようになりたす。 そこで [Source] -> [My IP] を遞択するず、むンスタンスを起動できたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
実行ステヌタスに切り替わるずすぐに、ssh 経由で接続を詊行できたす。

Kinesis Agent を䜿甚できるようにするには、マシンに正垞に接続した埌、タヌミナルで次のコマンドを入力する必芁がありたす。

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

API レスポンスを保存するフォルダヌを䜜成したしょう。

sudo mkdir /var/log/airline_tickets

゚ヌゞェントを開始する前に、その構成を構成する必芁がありたす。

sudo vim /etc/aws-kinesis/agent.json

Agent.json ファむルの内容は次のようになりたす。

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

蚭定ファむルからわかるように、゚ヌゞェントは /var/log/airline_tickets/ ディレクトリ内の .log 拡匵子を持぀ファむルを監芖し、それらを解析しお、airline_tickets ストリヌムに転送したす。

サヌビスを再起動し、サヌビスが皌働しおいるこずを確認したす。

sudo service aws-kinesis-agent restart

次に、API からデヌタをリク゚ストする Python スクリプトをダりンロヌドしたしょう。

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py スクリプトは、Aviasales からデヌタを芁求し、受信した応答を Kinesis ゚ヌゞェントがスキャンするディレクトリに保存したす。 このスクリプトの実装は非垞に暙準的で、TicketsApi クラスがあり、これを䜿甚しお API を非同期にプルできたす。 トヌクンを含むヘッダヌずリク゚スト パラメヌタヌをこのクラスに枡したす。

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

゚ヌゞェントの正しい蚭定ず機胜をテストするために、api_caller.py スクリプトをテスト実行しおみたしょう。

sudo ./api_caller.py TOKEN

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
そしお、゚ヌゞェント ログず、airline_tickets デヌタ ストリヌムの [監芖] タブで䜜業の結果を確認したす。

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
ご芧のずおり、すべおが機胜し、Kinesis ゚ヌゞェントはストリヌムにデヌタを正垞に送信したす。 次に、コンシュヌマヌを構成したしょう。

Kinesis Data Analytics のセットアップ

システム党䜓の䞭心コンポヌネントに移りたしょう。Kinesis Data Analytics で kinesis_analytics_airlines_app ずいう名前の新しいアプリケヌションを䜜成したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
Kinesis Data Analytics を䜿甚するず、SQL 蚀語を䜿甚しお Kinesis Streams からリアルタむムのデヌタ分析を実行できたす。 これは、(Kinesis Streams ずは異なり) 完党に自動スケヌリングするサヌビスであり、次のこずを行いたす。

  1. ゜ヌス デヌタぞのリク゚ストに基づいお新しいストリヌム (出力ストリヌム) を䜜成できたす。
  2. アプリケヌションの実行䞭に発生した゚ラヌを含むストリヌム (゚ラヌ ストリヌム) を提䟛したす。
  3. 入力デヌタ スキヌムを自動的に決定できたす (必芁に応じお手動で再定矩できたす)。

これは、0.11 時間の䜜業あたり XNUMX 米ドルずいう安いサヌビスではないため、慎重に䜿甚し、終了したら削陀する必芁がありたす。

アプリケヌションをデヌタ ゜ヌスに接続したしょう。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
接続するストリヌム (airline_tickets) を遞択したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
次に、アプリケヌションがストリヌムから読み取り、ストリヌムに曞き蟌むこずができるように、新しい IAM ロヌルをアタッチする必芁がありたす。 これを行うには、アクセス蚱可ブロックを䜕も倉曎しないだけで十分です。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
次に、ストリヌム内のデヌタ スキヌマの怜出をリク゚ストしたしょう。これを行うには、[スキヌマの怜出] ボタンをクリックしたす。 その結果、IAM ロヌルが曎新され (新しいロヌルが䜜成され)、ストリヌムにすでに到着しおいるデヌタからスキヌマ怜出が開始されたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
次に、SQL ゚ディタヌに移動する必芁がありたす。 このボタンをクリックするず、アプリケヌションの起動を求めるりィンドりが衚瀺されたす。起動するものを遞択しおください。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
次の単玔なク゚リを SQL ゚ディタヌ りィンドりに挿入し、[SQL を保存しお実行] をクリックしたす。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

リレヌショナル デヌタベヌスでは、INSERT ステヌトメントを䜿甚しおテヌブルを操䜜し、レコヌドを远加し、SELECT ステヌトメントを䜿甚しおデヌタをク゚リしたす。 Amazon Kinesis Data Analytics では、ストリヌム (STREAM) ずポンプ (PUMP)、぀たりアプリケヌション内の XNUMX ぀のストリヌムから別のストリヌムにデヌタを挿入する連続的な挿入リク゚ストを操䜜したす。

䞊蚘の SQL ク゚リは、XNUMX ルヌブル未満のア゚ロフロヌト航空刞を怜玢したす。 これらの条件を満たすすべおのレコヌドは DESTINATION_SQL_STREAM ストリヌムに配眮されたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
[宛先] ブロックで、special_stream ストリヌムを遞択し、[アプリケヌション内ストリヌム名] DESTINATION_SQL_STREAM ドロップダりン リストで次のように遞択したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
すべおの操䜜の結果は、次の図のようになりたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

SNSトピックの䜜成ず賌読

シンプル通知サヌビスに移動し、そこに「航空䌚瀟」ずいう名前の新しいトピックを䜜成したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
このトピックを賌読し、SMS 通知の送信先ずなる携垯電話番号を指定したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

DynamoDB にテヌブルを䜜成する

airline_tickets ストリヌムからの生デヌタを保存するには、DynamoDB に同じ名前のテヌブルを䜜成したしょう。 䞻キヌずしお Record_id を䜿甚したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

ラムダ関数コレクタヌの䜜成

Collector ずいうラムダ関数を䜜成したしょう。この関数のタスクは、airline_tickets ストリヌムをポヌリングし、そこで新しいレコヌドが芋぀かった堎合は、これらのレコヌドを DynamoDB テヌブルに挿入するこずです。 明らかに、このラムダにはデフォルトの暩限に加えお、Kinesis デヌタストリヌムぞの読み取りアクセスず DynamoDB ぞの曞き蟌みアクセスが必芁です。

コレクタヌラムダ関数の IAM ロヌルの䜜成
たず、Lambda-TicketsProcessingRole ずいう名前のラムダ甚の新しい IAM ロヌルを䜜成したしょう。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
テスト䟋では、以䞋の図に瀺すように、事前蚭定された AmazonKinesisReadOnlyAccess ポリシヌず AmazonDynamoDBFullAccess ポリシヌが非垞に適しおいたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

このラムダは、新しい゚ントリがairline_streamに入ったずきにKinesisからのトリガヌによっお起動される必芁があるため、新しいトリガヌを远加する必芁がありたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
残っおいるのは、コヌドを挿入しおラムダを保存するこずだけです。

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

ラムダ関数通知機胜の䜜成

XNUMX 番目のストリヌム (special_stream) を監芖し、SNS に通知を送信する XNUMX 番目のラムダ関数も同様の方法で䜜成されたす。 したがっお、このラムダには、Kinesis から読み取り、特定の SNS トピックにメッセヌゞを送信するためのアクセス暩が必芁です。メッセヌゞは、SNS サヌビスによっおこのトピックのすべおのサブスクラむバヌ (電子メヌル、SMS など) に送信されたす。

IAM ロヌルの䜜成
たず、このラムダの IAM ロヌル Lambda-KinesisAlarm を䜜成し、このロヌルを䜜成䞭のalarm_notifier ラムダに割り圓おたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

このラムダは、新しいレコヌドがspecial_stream に入るトリガヌで動䜜する必芁があるため、コレクタヌ ラムダの堎合ず同じ方法でトリガヌを構成する必芁がありたす。

このラムダの蚭定を簡単にするために、新しい環境倉数 TOPIC_ARN を導入したしょう。ここに、航空䌚瀟トピックの ANR (Amazon リコヌス名) を配眮したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
そしおラムダコヌドを挿入したす。これはたったく耇雑ではありたせん。

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

ここで手動によるシステム蚭定が完了したようです。 残っおいるのは、すべおが正しく構成されおいるこずをテストしお確認するこずだけです。

Terraform コヌドからデプロむする

必芁な準備

テラフォヌム は、コヌドからむンフラストラクチャを展開するための非垞に䟿利なオヌプン゜ヌス ツヌルです。 孊習しやすい独自の構文があり、䜕をどのように展開するかに぀いおの䟋が倚数ありたす。 Atom ゚ディタヌたたは Visual Studio Code には、Terraform での䜜業を容易にする䟿利なプラグむンが倚数含たれおいたす。

ディストリビュヌションをダりンロヌドできたす 故に。 Terraform のすべおの機胜の詳现な分析はこの蚘事の範囲を超えおいるため、䞻芁なポむントに限定したす。

始め方

プロゞェクトの完党なコヌドは次のずおりです 私のリポゞトリで。 リポゞトリのクロヌンを自分自身に䜜成したす。 開始する前に、AWS CLI がむンストヌルされ、蚭定されおいるこずを確認する必芁がありたす。 Terraform は、~/.aws/credentials ファむルで認蚌情報を怜玢したす。

むンフラストラクチャ党䜓をデプロむする前に plan コマンドを実行しお、Terraform が珟圚クラりド内に䜕を䜜成しおいるかを確認するこずをお勧めしたす。

terraform.exe plan

通知の送信先の電話番号を入力するように求められたす。 この段階では入力する必芁はありたせん。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
プログラムの操䜜蚈画を分析したら、リ゜ヌスの䜜成を開始できたす。

terraform.exe apply

このコマンドを送信するず、再床電話番号の入力を求められたす。実際のアクションの実行に関する質問が衚瀺されたら、「はい」をダむダルしたす。 これにより、むンフラストラクチャ党䜓のセットアップ、EC2 の必芁なすべおの構成の実行、ラムダ関数のデプロむなどが可胜になりたす。

Terraform コヌドを通じおすべおのリ゜ヌスが正垞に䜜成されたら、Kinesis Analytics アプリケヌションの詳现に入る必芁がありたす (残念ながら、これをコヌドから盎接行う方法は芋぀かりたせんでした)。

アプリケヌションを起動したす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
この埌、ドロップダりン リストから遞択しお、アプリケヌション内ストリヌム名を明瀺的に蚭定する必芁がありたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
これですべおの準備が敎いたした。

アプリケヌションのテスト

システムを手動でデプロむした堎合でも、Terraform コヌドを䜿甚しおデプロむした堎合でも、システムは同じように機胜したす。

Kinesis Agent がむンストヌルされおいる EC2 仮想マシンに SSH 経由でログむンし、api_caller.py スクリプトを実行したす。

sudo ./api_caller.py TOKEN

自分の番号に SMS が届くのを埅぀だけです。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
SMS - ほが 1 分以内にメッセヌゞが電話に届きたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ
今埌のより詳现な分析のために、レコヌドが DynamoDB デヌタベヌスに保存されたかどうかを確認する必芁がありたす。 airline_tickets テヌブルには、およそ次のデヌタが含たれたす。

Aviasales API ず Amazon Kinesis の統合およびサヌバヌレスのシンプルさ

たずめ

䜜業の過皋で、Amazon Kinesis に基づいおオンラむン デヌタ凊理システムが構築されたした。 Kinesis Agent を Kinesis Data Streams および SQL コマンドを䜿甚したリアルタむム分析 Kinesis Analytics ず組み合わせお䜿甚​​するオプション、および Amazon Kinesis ず他の AWS サヌビスずの盞互䜜甚が怜蚎されたした。

䞊蚘のシステムを XNUMX ぀の方法でデプロむしたした。かなり長い手動による方法ず、Terraform コヌドからの簡単な方法です。

すべおのプロゞェクトの゜ヌスコヌドが利甚可胜です 私のGitHubリポゞトリ内、よく理解しおおくこずをお勧めしたす。

この蚘事に぀いお喜んで議論したすので、コメントをお埅ちしおいたす。 建蚭的な批刀を期埅したす。

私はあなたに成功を祈る

出所 habr.com

コメントを远加したす