Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Hey Habr!

Ou renmen vole avyon? Mwen renmen li, men pandan oto-izolasyon mwen menm tou mwen te tonbe nan renmen ak analize done sou tikè avyon ki soti nan yon resous byen li te ye - Aviasales.

Jodi a nou pral analize travay Amazon Kinesis, bati yon sistèm difizyon ak analiz an tan reyèl, enstale baz done Amazon DynamoDB NoSQL kòm depo done prensipal la, epi mete notifikasyon SMS pou tikè enteresan.

Tout detay yo anba koupe a! Ale!

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Entwodiksyon

Pou egzanp, nou bezwen aksè a Aviasales API. Aksè nan li gratis epi san restriksyon; ou jis bezwen enskri nan seksyon "Devlopè" pou resevwa siy API ou pou jwenn aksè nan done yo.

Objektif prensipal atik sa a se bay yon konpreyansyon jeneral sou itilizasyon enfòmasyon difizyon nan AWS; nou pran an konsiderasyon ke done API yo te itilize a retounen yo pa entèdi ajou epi yo transmèt nan kachèt la, ki se fòme baze sou rechèch itilizatè yo nan sit Aviasales.ru ak Jetradar.com pou dènye 48 èdtan.

Kinesis-ajan, enstale sou machin nan pwodwi, resevwa atravè API a pral otomatikman analize ak transmèt done nan kouran an vle atravè Kinesis Data Analytics. Vèsyon kri kouran sa a pral ekri dirèkteman nan magazen an. Depo done anvan tout koreksyon ki deplwaye nan DynamoDB pral pèmèt analiz tikè pi pwofon atravè zouti BI, tankou AWS Quick Sight.

Nou pral konsidere de opsyon pou deplwaye tout enfrastrikti a:

  • Manyèl - atravè AWS Management Console;
  • Enfrastrikti ki soti nan kòd Terraform se pou otomatik parese;

Achitekti nan sistèm nan devlope

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Konpozan yo itilize:

  • Aviasales API — done API sa a retounen yo pral itilize pou tout travay ki vin apre yo;
  • EC2 Pwodiktè Enstans — yon machin vityèl regilye nan nwaj la kote yo pral pwodwi kouran done antre:
    • Kinesis Agent se yon aplikasyon Java enstale lokalman sou machin nan ki bay yon fason fasil pou kolekte epi voye done nan Kinesis (Kinesis Data Streams oswa Kinesis Firehose). Ajan an toujou ap kontwole yon seri fichye nan anyè yo espesifye epi li voye nouvo done bay Kinesis;
    • Script moun kap rele API — Yon script Python ki fè demann nan API a epi mete repons lan nan yon katab ke Ajan Kinesis la kontwole;
  • Kinesis Data Streams — sèvis difizyon done an tan reyèl ak kapasite dechèl lajè;
  • Kinesis Analytics se yon sèvis san sèvè ki senplifye analiz done difizyon an tan reyèl. Amazon Kinesis Data Analytics konfigirasyon resous aplikasyon yo ak otomatikman balanse pou okipe nenpòt volim done k ap rantre;
  • AWS Lambda — yon sèvis ki pèmèt ou kouri kòd san yo pa fè bak oswa mete kanpe sèvè. Tout pouvwa enfòmatik otomatikman echèl pou chak apèl;
  • Amazon DynamoDB - Yon baz done pè kle-valè ak dokiman ki bay latansi mwens pase 10 milisgond lè w ap kouri nan nenpòt echèl. Lè w ap itilize DynamoDB, ou pa bezwen pwovizyon, patch, oswa jere okenn sèvè. DynamoDB otomatikman echèl tab yo ajiste kantite resous ki disponib yo epi kenbe pèfòmans segondè. Pa gen okenn administrasyon sistèm obligatwa;
  • Amazon SNS - yon sèvis konplètman jere pou voye mesaj lè l sèvi avèk modèl piblikatè-abònen (Pub/Sub), ak ki ou ka izole mikwosèvis, sistèm distribiye ak aplikasyon san sèvè. SNS ka itilize pou voye enfòmasyon bay itilizatè final yo atravè notifikasyon pouse mobil, mesaj SMS ak imèl.

Premye fòmasyon

Pou imite koule done a, mwen deside sèvi ak enfòmasyon sou tikè avyon Aviasales API retounen. NAN dokiman byen yon lis vaste nan metòd diferan, se pou nou pran youn nan yo - "Kalandriye pri chak mwa", ki retounen pri pou chak jou nan mwa a, gwoupe pa kantite transfè. Si w pa presize mwa rechèch la nan demann lan, yo pral retounen enfòmasyon pou mwa ki vin apre a.

Se konsa, ann anrejistre epi jwenn siy nou an.

Yon egzanp demann anba a:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Metòd ki anwo a pou resevwa done ki soti nan API a lè w espesifye yon siy nan demann lan ap travay, men mwen prefere pase siy aksè a nan header la, kidonk nou pral sèvi ak metòd sa a nan script api_caller.py.

Egzanp repons:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Egzanp repons API ki pi wo a montre yon tikè soti St. Petersburg pou Phuk... Oh, ki rèv...
Depi mwen soti Kazan, ak Phuket se kounye a "sèlman yon rèv", ann gade pou tikè soti nan Saint Petersburg Kazan.

Li sipoze ke ou deja gen yon kont AWS. Mwen ta renmen imedyatman atire atansyon espesyal sou lefèt ke Kinesis ak voye notifikasyon via SMS pa enkli nan anyèl la. Nivo gratis (itilizasyon gratis). Men, menm malgre sa, ak yon koup de dola nan tèt ou, li se byen posib yo bati sistèm yo pwopoze a epi jwe ak li. Epi, nan kou, pa bliye efase tout resous apre yo pa nesesè ankò.

Erezman, fonksyon DynamoDb ak lambda yo pral gratis pou nou si nou satisfè limit gratis chak mwa nou yo. Pou egzanp, pou DynamoDB: 25 GB nan depo, 25 WCU / RCU ak 100 milyon demann. Ak yon milyon apèl fonksyon lambda pa mwa.

Deplwaman manyèl sistèm

Mete kanpe Kinesis Data Streams

Ann ale nan sèvis Kinesis Data Streams epi kreye de nouvo kouran, yon shard pou chak.

Ki sa ki se yon shard?
Yon shard se inite transfè done debaz nan yon kouran Amazon Kinesis. Yon segman bay transfè done opinyon nan yon vitès 1 MB / s ak transfè done pwodiksyon nan yon vitès 2 MB / s. Yon segman sipòte jiska 1000 antre PUT pou chak segonn. Lè w ap kreye yon kouran done, ou bezwen presize kantite segman ki nesesè yo. Pou egzanp, ou ka kreye yon kouran done ak de segman. Ravin done sa a pral bay transfè done opinyon nan 2 MB / s ak transfè done pwodiksyon nan 4 MB / s, sipòte jiska 2000 dosye PUT pou chak segonn.

Plis shards nan kouran ou a, se pi gwo debi li yo. Nan prensip, sa a se ki jan koule yo echèl - lè yo ajoute shards. Men, plis shards ou genyen, se pi gwo pri a. Chak shard koute 1,5 santim pou chak èdtan ak yon lòt 1.4 santim pou chak milyon inite chaj PUT.

Ann kreye yon nouvo kouran ak non an avyon_tickets, 1 teson ap ase pou li:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Koulye a, ann kreye yon lòt fil ak non an espesyal_kouran:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Konfigirasyon pwodiktè

Pou analize yon travay, li ase pou itilize yon egzanp regilye EC2 kòm yon pwodiktè done. Li pa dwe yon machin vityèl pwisan, chè, yon plas t2.micro ap fè jis byen.

Nòt enpòtan: pou egzanp, ou ta dwe itilize imaj - Amazon Linux AMI 2018.03.0, li gen mwens anviwònman pou byen vit lanse Ajan Kinesis la.

Ale nan sèvis EC2 a, kreye yon nouvo machin vityèl, chwazi AMI a vle ak kalite t2.micro, ki enkli nan Nivo gratis la:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Nan lòd pou machin vityèl ki fèk kreye a kapab kominike avèk sèvis Kinesis la, yo dwe bay li dwa pou fè sa. Pi bon fason pou fè sa se bay yon wòl IAM. Se poutèt sa, sou Etap 3: Konfigirasyon Detay Enstans ekran an, ou ta dwe chwazi Kreye nouvo wòl IAM:

Kreye yon wòl IAM pou EC2
Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Nan fenèt ki ouvè a, chwazi ke nou ap kreye yon nouvo wòl pou EC2 epi ale nan seksyon otorizasyon:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Sèvi ak egzanp fòmasyon an, nou pa bezwen antre nan tout sibtilite konfigirasyon granulaire nan dwa resous, kidonk nou pral chwazi règleman yo pre-konfigirasyon pa Amazon: AmazonKinesisFullAccess ak CloudWatchFullAccess.

Ann bay kèk non ki gen sans pou wòl sa a, pa egzanp: EC2-KinesisStreams-FullAccess. Rezilta a ta dwe menm jan yo montre nan foto ki anba a:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Apre ou fin kreye nouvo wòl sa a, pa bliye tache li nan egzanp machin vityèl kreye a:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Nou pa chanje anyen sou ekran sa a epi ale nan pwochen fenèt yo.

Anviwònman kondwi difisil yo ka kite kòm default, osi byen ke tags yo (byenke li se bon pratik yo sèvi ak tags, omwen bay egzanp lan yon non epi endike anviwònman an).

Koulye a, nou sou Etap 6: Konfigure tab Gwoup Sekirite, kote ou bezwen kreye yon nouvo oswa presize gwoup Sekirite ki egziste deja ou a, ki pèmèt ou konekte atravè ssh (pò 22) nan egzanp lan. Chwazi Sous -> IP mwen an epi ou ka lanse egzanp lan.

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Le pli vit ke li chanje nan sitiyasyon kouri, ou ka eseye konekte ak li atravè ssh.

Pou kapab travay ak Kinesis Agent, apre ou fin konekte ak machin nan avèk siksè, ou dwe antre kòmandman sa yo nan tèminal la:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ann kreye yon katab pou konsève pou repons API yo:

sudo mkdir /var/log/airline_tickets

Anvan ou kòmanse ajan an, ou bezwen konfigirasyon konfigirasyon li yo:

sudo vim /etc/aws-kinesis/agent.json

Sa ki nan fichye agent.json ta dwe sanble sa a:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Jan yo ka wè nan fichye konfigirasyon an, ajan an pral kontwole dosye ki gen ekstansyon .log nan anyè /var/log/airline_tickets/, analize yo epi transfere yo nan kouran airline_tickets.

Nou rekòmanse sèvis la epi asire w ke li fonksyone:

sudo service aws-kinesis-agent restart

Koulye a, ann telechaje script Python ki pral mande done nan API a:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Script api_caller.py a mande done nan men Aviasales epi sove repons yo resevwa nan anyè ajan Kinesis la analize. Aplikasyon an nan script sa a se byen estanda, gen yon klas TicketsApi, li pèmèt ou asynchrone rale API a. Nou pase yon header ak yon siy epi mande paramèt nan klas sa a:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Pou teste anviwònman kòrèk ak fonksyonalite ajan an, ann teste kouri script api_caller.py la:

sudo ./api_caller.py TOKEN

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Epi nou gade rezilta travay la nan jounal ajan yo ak sou tab Siveyans nan kouran done airline_tickets la:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Kòm ou ka wè, tout bagay ap travay ak Ajan Kinesis la avèk siksè voye done nan kouran an. Koulye a, kite a konfigirasyon konsomatè.

Mete kanpe Kinesis Data Analytics

Ann ale nan eleman santral la nan tout sistèm nan - kreye yon nouvo aplikasyon nan Kinesis Data Analytics ki rele kinesis_analytics_airlines_app:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Kinesis Data Analytics pèmèt ou fè analiz done an tan reyèl nan Kinesis Streams lè l sèvi avèk lang SQL. Li se yon sèvis totalman otoscaling (kontrèman ak Kinesis Streams) ki:

  1. pèmèt ou kreye nouvo kouran (Output Stream) ki baze sou demann nan done sous;
  2. bay yon kouran ak erè ki te fèt pandan aplikasyon yo t ap kouri (Error Stream);
  3. ka otomatikman detèmine konplo a done opinyon (li ka manyèlman redefini si sa nesesè).

Sa a se pa yon sèvis bon mache - 0.11 USD pou chak èdtan nan travay, kidonk ou ta dwe itilize li ak anpil atansyon epi efase li lè ou fini.

Ann konekte aplikasyon an ak sous done a:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Chwazi kouran nou pral konekte a (airline_tickets):

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Apre sa, ou bezwen tache yon nouvo wòl IAM pou aplikasyon an ka li nan kouran an epi ekri nan kouran an. Pou fè sa, li ase pou pa chanje anyen nan blòk otorizasyon Aksè:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Koulye a, ann mande dekouvèt chema done nan kouran an; pou fè sa, klike sou bouton "Dekouvri chema". Kòm yon rezilta, yo pral mete ajou wòl IAM (yo pral kreye yon nouvo) epi yo pral lanse deteksyon chema apati done ki deja rive nan kouran an:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Koulye a, ou bezwen ale nan editè a SQL. Lè w klike sou bouton sa a, yon fenèt ap parèt k ap mande w lanse aplikasyon an - chwazi sa w vle lanse:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Antre rechèch senp sa a nan fenèt editè SQL la epi klike sou Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Nan baz done relasyon, ou travay ak tab lè l sèvi avèk deklarasyon INSERT pou ajoute dosye ak yon deklarasyon SELECT pou rechèch done. Nan Amazon Kinesis Data Analytics, ou travay ak kouran (STREAMs) ak ponp (PUMPs)—demann insert kontinyèl ki mete done ki sòti nan yon kouran nan yon aplikasyon nan yon lòt kouran.

Rekèt SQL ki prezante anwo a ap chèche tikè Aeroflot a yon pri ki pi ba a senk mil rubles. Tout dosye ki satisfè kondisyon sa yo pral mete nan kouran DESTINATION_SQL_STREAM.

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Nan blòk Destinasyon an, chwazi kouran espesyal_stream la, ak nan non kouran nan aplikasyon an DESTINATION_SQL_STREAM lis deroulant:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Rezilta a nan tout manipilasyon yo ta dwe yon bagay ki sanble ak foto ki anba a:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Kreye ak abònman nan yon sijè SNS

Ale nan Sèvis Notifikasyon Senp la epi kreye yon nouvo sijè ki gen non Airlines:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Abònman ak sijè sa a epi endike nimewo telefòn mobil kote yo pral voye notifikasyon SMS yo:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Kreye yon tab nan DynamoDB

Pou estoke done anvan tout koreksyon ki soti nan kouran airline_tickets yo, ann kreye yon tab nan DynamoDB ak menm non an. Nou pral itilize record_id kòm kle prensipal la:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Kreye yon pèseptè fonksyon lambda

Ann kreye yon fonksyon lambda ki rele Pèseptè, ki gen pou l fè sonde kouran airline_tickets la epi, si yo jwenn nouvo dosye la, mete dosye sa yo nan tablo DynamoDB la. Li evidan, anplis dwa default yo, lambda sa a dwe gen aksè lekti nan kouran done Kinesis ak aksè ekri nan DynamoDB.

Kreye yon wòl IAM pou fonksyon lambda pèseptè a
Premyèman, ann kreye yon nouvo wòl IAM pou lambda ki rele Lambda-TicketsProcessingRole:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Pou egzanp tès la, politik AmazonKinesisReadOnlyAccess ak AmazonDynamoDBFullAccess pre-konfigirasyon yo byen apwopriye, jan yo montre nan foto ki anba a:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Lambda sa a ta dwe lanse pa yon deklanche soti nan Kinesis lè nouvo antre antre nan airline_stream, kidonk nou bezwen ajoute yon nouvo deklanche:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Tout sa ki rete se mete kòd la epi sove lambda a.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Kreye yon notifikasyon fonksyon lambda

Dezyèm fonksyon lambda a, ki pral kontwole dezyèm kouran an (special_stream) epi voye yon notifikasyon bay SNS, se yon fason ki sanble. Se poutèt sa, lambda sa a dwe gen aksè a li nan Kinesis epi voye mesaj nan yon sijè SNS bay, ki pral Lè sa a, sèvis SNS a voye bay tout abonnés nan sijè sa a (imel, SMS, elatriye).

Kreye yon wòl IAM
Premyèman, nou kreye wòl IAM Lambda-KinesisAlarm pou lambda sa a, epi answit bay wòl sa a alarm_notifier lambda ke yo te kreye:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Lambda sa a ta dwe travay sou yon deklanche pou nouvo dosye antre nan special_stream la, kidonk ou bezwen configured deklanche a menm jan nou te fè pou Collector lambda a.

Pou fè li pi fasil pou konfigirasyon lambda sa a, ann prezante yon nouvo varyab anviwònman - TOPIC_ARN, kote nou mete ANR (Amazon Recourse Names) nan sijè Airlines la:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Epi mete kòd lambda a, li pa konplike ditou:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Li sanble ke sa a se kote konfigirasyon sistèm manyèl la fini. Tout sa ki rete se teste ak asire w ke nou te configuré tout bagay kòrèkteman.

Deplwaye soti nan kòd Terraform

Preparasyon ki nesesè

Terraform se yon zouti sous louvri trè pratik pou deplwaye enfrastrikti nan kòd. Li gen sentaks pwòp li yo ki fasil pou aprann epi li gen anpil egzanp sou ki jan ak ki sa yo deplwaye. Editè Atom la oswa Kòd Visual Studio gen anpil grefon sou la men ki fè travay ak Terraform pi fasil.

Ou ka telechaje distribisyon an kon sa. Yon analiz detaye sou tout kapasite Terraform pi lwen pase sijè ki abòde lan atik sa a, kidonk nou pral limite tèt nou nan pwen prensipal yo.

Ki jan yo kòmanse

Kòd la konplè nan pwojè a se nan depo mwen an. Nou klonaj repozitwa a pou tèt nou. Anvan w kòmanse, ou bezwen asire w ke ou gen AWS CLI enstale ak konfigirasyon, paske... Terraform ap chèche kalifikasyon nan dosye ~/.aws/credentials la.

Yon bon pratik se kouri lòd plan an anvan deplwaye tout enfrastrikti a pou wè sa Terraform ap kreye kounye a pou nou nan nwaj la:

terraform.exe plan

Y ap mande w pou w antre yon nimewo telefòn pou w voye notifikasyon. Li pa nesesè pou antre nan li nan etap sa a.

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Lè nou fin analize plan operasyon pwogram nan, nou ka kòmanse kreye resous:

terraform.exe apply

Apre w fin voye kòmandman sa a, yo pral mande w ankò pou antre yon nimewo telefòn; konpoze "wi" lè yo montre yon kesyon sou aktyèlman fè aksyon yo. Sa a pral pèmèt ou mete kanpe tout enfrastrikti a, fè tout konfigirasyon ki nesesè nan EC2, deplwaye fonksyon lambda, elatriye.

Apre tout resous yo te kreye avèk siksè atravè kòd Terraform, ou bezwen ale nan detay yo nan aplikasyon an Kinesis Analytics (malerezman, mwen pa t 'jwenn ki jan fè sa dirèkteman nan kòd la).

Lanse aplikasyon an:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Apre sa, ou dwe klèman mete non kouran nan aplikasyon an lè w chwazi nan lis deroulant a:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Koulye a, tout bagay pare yo ale.

Tès aplikasyon an

Kèlkeswa fason ou te deplwaye sistèm nan, manyèlman oswa atravè kòd Terraform, li pral travay menm jan an.

Nou konekte via SSH nan machin vityèl EC2 kote Kinesis Agent enstale epi kouri script api_caller.py.

sudo ./api_caller.py TOKEN

Tout sa ou dwe fè se tann yon SMS nan nimewo ou:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
SMS - mesaj la rive sou telefòn la nan prèske 1 minit:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè
Li rete pou wè si dosye yo te sove nan baz done DynamoDB pou analiz ki vin apre, pi detaye. Tablo airline_tickets la gen apeprè done sa yo:

Entegrasyon API Aviasales ak Amazon Kinesis ak senplisite san sèvè

Konklizyon

Nan kou a nan travay la te fè, yon sistèm pwosesis done sou entènèt te bati ki baze sou Amazon Kinesis. Opsyon pou itilize Kinesis Agent an konjonksyon avèk Kinesis Data Streams ak analiz an tan reyèl Kinesis Analytics lè l sèvi avèk kòmandman SQL, ansanm ak entèraksyon Amazon Kinesis ak lòt sèvis AWS yo te konsidere.

Nou te deplwaye sistèm ki anwo a nan de fason: yon manyèl olye long ak yon sèl rapid nan kòd Terraform la.

Tout kòd sous pwojè a disponib nan depo GitHub mwen an, Mwen sijere ou familyarize w avèk li.

Mwen kontan diskite sou atik la, mwen tann kòmantè ou yo. Mwen espere pou kritik konstriktif.

Mwen swete w siksè!

Sous: www.habr.com

Add nouvo kòmantè