Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

Hej Habr!

Ĉu vi ŝatas flugi aviadilojn? Mi amas ĝin, sed dum memizolado mi ankaŭ enamiĝis analizi datumojn pri flugbiletoj de unu konata rimedo - Aviasales.

Hodiaŭ ni analizos la laboron de Amazon Kinesis, konstruos fluan sistemon kun realtempa analizo, instalos la Amazon DynamoDB NoSQL-datumbazon kiel ĉefan datumstokadon kaj starigos SMSajn sciigojn por interesaj biletoj.

Ĉiuj detaloj estas sub la tranĉo! Iru!

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

Enkonduko

Por la ekzemplo, ni bezonas aliron al Aviasales API. Aliro al ĝi estas senpage kaj sen limigo; vi nur bezonas registriĝi en la sekcio "Programistoj" por ricevi vian API-ĵetonon por aliri la datumojn.

La ĉefa celo de ĉi tiu artikolo estas doni ĝeneralan komprenon pri la uzo de informa fluado en AWS; ni konsideras, ke la datumoj redonitaj de la uzata API ne estas strikte ĝisdatigitaj kaj estas transdonitaj el la kaŝmemoro, kiu estas formita surbaze de serĉoj de uzantoj de la retejoj Aviasales.ru kaj Jetradar.com dum la lastaj 48 horoj.

Kinesis-agento, instalita sur la produktanta maŝino, ricevita per la API aŭtomate analizos kaj transdonos datumojn al la dezirata fluo per Kinesis Data Analytics. La kruda versio de ĉi tiu fluo estos skribita rekte al la vendejo. La stokado de krudaj datumoj deplojita en DynamoDB permesos pli profundan biletan analizon per BI-iloj, kiel AWS Quick Sight.

Ni konsideros du eblojn por disfaldi la tutan infrastrukturon:

  • Manlibro - per AWS Management Console;
  • Infrastrukturo de Terraform-kodo estas por maldiligentaj aŭtomatistoj;

Arkitekturo de la evoluinta sistemo

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Uzitaj komponantoj:

  • Aviasales API — la datumoj redonitaj de ĉi tiu API estos uzataj por ĉiuj postaj laboroj;
  • EC2-Produktanto-Okazaĵo - regula virtuala maŝino en la nubo, sur kiu la eniga datumfluo estos generita:
    • Kinesis Agento estas Java aplikaĵo instalita loke sur la maŝino, kiu provizas facilan manieron kolekti kaj sendi datumojn al Kinesis (Kinesis Data Streams aŭ Kinesis Firehose). La agento konstante kontrolas aron da dosieroj en la specifitaj dosierujoj kaj sendas novajn datumojn al Kinesis;
    • API-Alvokanta Skripto — Python-skripto, kiu faras petojn al la API kaj metas la respondon en dosierujon monitoritan de la Kinesis Agent;
  • Kinesis Data Streams — realtempa datumflua servo kun larĝaj skalaj kapabloj;
  • Kinesis Analytics estas senservila servo, kiu simpligas la analizon de fluaj datumoj en reala tempo. Amazon Kinesis Data Analytics agordas aplikaĵajn rimedojn kaj aŭtomate skalas por manipuli ajnan volumon de envenantaj datumoj;
  • AWS Lambda — servo, kiu ebligas al vi ruli kodon sen sekurkopii aŭ agordi servilojn. Ĉiu komputika potenco estas aŭtomate skalita por ĉiu voko;
  • Amazon DynamoDB - Datumaro de ŝlosil-valoraj paroj kaj dokumentoj, kiu provizas latentecon de malpli ol 10 milisekundoj dum funkciado je ajna skalo. Kiam vi uzas DynamoDB, vi ne bezonas provizi, fliki aŭ administri iujn ajn servilojn. DynamoDB aŭtomate skalas tablojn por ĝustigi la kvanton de disponeblaj rimedoj kaj konservi altan rendimenton. Neniu sistema administrado estas bezonata;
  • Amazon SNS - plene administrita servo por sendi mesaĝojn per la modelo de eldonejo-abonanto (Pub/Sub), per kiu vi povas izoli mikroservojn, distribuitajn sistemojn kaj senservilajn aplikaĵojn. SNS povas esti uzata por sendi informojn al finuzantoj per moveblaj puŝo-scioj, SMS-mesaĝoj kaj retpoŝtoj.

Komenca trejnado

Por emuli la datumfluon, mi decidis uzi la flugbiletajn informojn resenditajn de la Aviasales API. EN dokumentado sufiĉe ampleksa listo de malsamaj metodoj, ni prenu unu el ili - "Ĉiumonata Prezo-Kalendaro", kiu redonas prezojn por ĉiu tago de la monato, grupigitaj laŭ la nombro da translokigoj. Se vi ne specifas la serĉmonaton en la peto, informoj estos resenditaj por la monato post la nuna.

Do, ni registriĝu kaj ricevu nian ĵetonon.

Ekzempla peto estas sube:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

La supra metodo ricevi datumojn de la API per specifo de ĵetono en la peto funkcios, sed mi preferas pasi la alir-ĵetonon tra la kaplinio, do ni uzos ĉi tiun metodon en la skripto api_caller.py.

Ekzemplo de respondo:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

La ekzemplo de API-respondo supre montras bileton de Sankt-Peterburgo al Phuk... Ho, kia sonĝo...
Ĉar mi estas el Kazan, kaj Phuket nun estas "nur revo", ni serĉu biletojn de Sankt-Peterburgo al Kazan.

Ĝi supozas, ke vi jam havas AWS-konton. Mi ŝatus tuj atentigi speciale pri tio, ke Kinesis kaj sendo de sciigoj per SMS ne estas inkluzivitaj en la ĉiujara. Senpaga Nivelo (senpaga uzo). Sed eĉ malgraŭ tio, kun kelkaj dolaroj en menso, estas tute eble konstrui la proponitan sistemon kaj ludi kun ĝi. Kaj, kompreneble, ne forgesu forigi ĉiujn rimedojn post kiam ili ne plu bezonas.

Feliĉe, DynamoDb kaj lambda funkcioj estos senpagaj por ni se ni plenumos niajn monatajn senpagajn limojn. Ekzemple, por DynamoDB: 25 GB da stokado, 25 WCU/RCU kaj 100 milionoj da demandoj. Kaj miliono da lambda funkcio vokoj monate.

Mana sistema deplojo

Agordi Kinesis Data Streams

Ni iru al la servo Kinesis Data Streams kaj kreu du novajn fluojn, po unu peceto por ĉiu.

Kio estas peceto?
Breĉeto estas la baza datuma transiga unuo de Amazon Kinesis-rivereto. Unu segmento disponigas enigan datumtranslokigon kun rapideco de 1 MB/s kaj eliga datumtransigo kun rapideco de 2 MB/s. Unu segmento subtenas ĝis 1000 PUT-enskribojn je sekundo. Kiam vi kreas datumfluon, vi devas specifi la postulatan nombron da segmentoj. Ekzemple, vi povas krei datumfluon kun du segmentoj. Ĉi tiu datumfluo provizos enigan datumtranslokigon je 2 MB/s kaj eliga datumtransigo je 4 MB/s, subtenante ĝis 2000 PUT-rekordojn sekundo.

Ju pli da fragmentoj en via fluo, des pli granda estas ĝia trafluo. Principe jen kiel fluoj estas skalaj - per aldonado de pecetoj. Sed ju pli da pecetoj vi havas, des pli alta la prezo. Ĉiu peceto kostas 1,5 cendojn hore kaj pliajn 1.4 cendojn por ĉiu miliono da PUT-ŝarĝaj unuoj.

Ni kreu novan rivereton kun la nomo flugkompaniaj_biletoj, 1 peceto sufiĉos por li:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Nun ni kreu alian fadenon kun la nomo speciala_fluo:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

Agordo de produktanto

Por analizi taskon, sufiĉas uzi kutiman petskribon de EC2 kiel datumproduktanton. Ĝi ne devas esti potenca, multekosta virtuala maŝino; spot t2.micro bone agas.

Grava noto: ekzemple, vi devus uzi bildon - Amazon Linukso AMI 2018.03.0, ĝi havas malpli da agordoj por rapide lanĉi la Kinesis Agent.

Iru al la servo EC2, kreu novan virtualan maŝinon, elektu la deziratan AMI kun tipo t2.micro, kiu estas inkluzivita en la Senpaga Nivelo:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Por ke la nove kreita virtuala maŝino povu interagi kun la servo Kinesis, oni devas doni al ĝi rajtojn fari tion. La plej bona maniero fari tion estas atribui IAM-Rolon. Sekve, sur la ekrano Paŝo 3: Agordi Kazajn Detalojn, vi devus elekti Kreu novan IAM-Rolon:

Kreante IAM-rolon por EC2
Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
En la fenestro kiu malfermiĝas, elektu, ke ni kreas novan rolon por EC2 kaj iru al la sekcio Permesoj:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Uzante la ekzemplon de trejnado, ni ne devas eniri ĉiujn komplikaĵojn de granula agordo de rimedrajtoj, do ni elektos la politikojn antaŭ-agorditajn de Amazon: AmazonKinesisFullAccess kaj CloudWatchFullAccess.

Ni donu ian signifoplenan nomon por ĉi tiu rolo, ekzemple: EC2-KinesisStreams-FullAccess. La rezulto devus esti la sama kiel montrita en la bildo sube:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Post kreado de ĉi tiu nova rolo, ne forgesu ligi ĝin al la kreita virtuala maŝino petskribo:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Ni ne ŝanĝas ion alian sur ĉi tiu ekrano kaj pluiras al la sekvaj fenestroj.

La durdiskaj agordoj povas esti lasitaj kiel defaŭlte, same kiel la etikedoj (kvankam estas bona praktiko uzi etikedojn, almenaŭ donu al la instanco nomon kaj indiku la medion).

Nun ni estas sur la Paŝo 6: Agordu Sekurecan Grupon langeto, kie vi devas krei novan aŭ specifi vian ekzistantan Sekurecan grupon, kiu ebligas vin konekti per ssh (haveno 22) al la petskribo. Elektu Fonton -> Mia IP tie kaj vi povas lanĉi la petskribon.

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Tuj kiam ĝi ŝanĝas al funkcia stato, vi povas provi konekti al ĝi per ssh.

Por povi labori kun Kinesis Agent, post sukcese konektiĝi al la maŝino, vi devas enigi la jenajn komandojn en la terminalo:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ni kreu dosierujon por konservi API-respondojn:

sudo mkdir /var/log/airline_tickets

Antaŭ ol komenci la agenton, vi devas agordi ĝian agordon:

sudo vim /etc/aws-kinesis/agent.json

La enhavo de la dosiero agent.json devus aspekti jene:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kiel videblas el la agorda dosiero, la agento kontrolos dosierojn kun la etendo .log en la dosierujo /var/log/airline_tickets/, analizos ilin kaj transdonos ilin al la fluo de airline_tickets.

Ni rekomencas la servon kaj certigas, ke ĝi funkcias:

sudo service aws-kinesis-agent restart

Nun ni elŝutu la Python-skripton, kiu petos datumojn de la API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

La skripto api_caller.py petas datumojn de Aviasales kaj konservas la ricevitan respondon en la dosierujo, kiun la agento Kinesis skanas. La efektivigo de ĉi tiu skripto estas sufiĉe norma, ekzistas TicketsApi-klaso, ĝi permesas al vi nesinkrone tiri la API. Ni pasas kaplinion kun ĵetono kaj petas parametrojn al ĉi tiu klaso:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Por testi la ĝustajn agordojn kaj funkciojn de la agento, ni provu ruli la skripton api_caller.py:

sudo ./api_caller.py TOKEN

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Kaj ni rigardas la rezulton de la laboro en la protokoloj de Agentoj kaj en la langeto Monitorado en la datumfluo de airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Kiel vi povas vidi, ĉio funkcias kaj la Kinesis Agent sukcese sendas datumojn al la rivereto. Nun ni agordu konsumanton.

Agordi Kinesis Data Analytics

Ni transiru al la centra komponanto de la tuta sistemo - kreu novan aplikaĵon en Kinesis Data Analytics nomita kinesis_analytics_airlines_app:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Kinesis Data Analytics ebligas al vi realtempan datuman analizon de Kinesis Streams uzante la SQL-lingvon. Ĝi estas plene aŭtoskala servo (male al Kinesis Streams) tio:

  1. permesas krei novajn fluojn (Eliga Fluo) surbaze de petoj al fontaj datumoj;
  2. provizas fluon kun eraroj kiuj okazis dum aplikoj estis ruliĝantaj (Erara Fluo);
  3. povas aŭtomate determini la enigan datumskemon (ĝi povas esti mane redifinita se necese).

Ĉi tio ne estas malmultekosta servo - 0.11 USD po horo da laboro, do vi devas uzi ĝin zorge kaj forigi ĝin kiam vi finos.

Ni konektu la aplikaĵon al la datumfonto:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Elektu la rivereton al kiu ni konektos (airline_tickets):

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Poste, vi devas kunligi novan IAM-Rolon por ke la aplikaĵo povu legi el la fluo kaj skribi al la rivereto. Por fari tion, sufiĉas ne ŝanĝi ion ajn en la bloko de Aliro-permesoj:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Nun ni petu malkovron de la datumskemo en la fluo; por fari tion, alklaku la butonon "Malkovru skemon". Kiel rezulto, la IAM-rolo estos ĝisdatigita (nova estos kreita) kaj skemo-detekto estos lanĉita de la datumoj kiuj jam alvenis en la fluo:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Nun vi devas iri al la SQL-redaktilo. Kiam vi alklakas ĉi tiun butonon, fenestro aperos petante vin lanĉi la aplikaĵon - elektu tion, kion vi volas lanĉi:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Enigu la sekvan simplan demandon en la fenestron de SQL-redaktilo kaj alklaku Konservi kaj Rulu SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

En interrilataj datumbazoj, vi laboras kun tabeloj uzante INSERT-deklarojn por aldoni rekordojn kaj SELECT-demanieron por demandi datumojn. En Amazon Kinesis Data Analytics, vi laboras kun fluoj (STREAM-oj) kaj pumpiloj (PUMP-oj) - daŭraj enmetaj petoj, kiuj enmetas datumojn de unu fluo en aplikaĵo en alian fluon.

La SQL-demando prezentita supre serĉas Aeroflot-biletojn je kosto sub kvin mil rubloj. Ĉiuj rekordoj kiuj plenumas ĉi tiujn kondiĉojn estos metitaj en la fluo DESTINATION_SQL_STREAM.

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
En la Celbloko, elektu la special_stream-fluon, kaj en la En-aplika fluo-nomo DESTINATION_SQL_STREAM fal-listo:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
La rezulto de ĉiuj manipuladoj devus esti io simila al la bildo sube:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

Krei kaj aboni SNS-temon

Iru al la Simpla Sciiga Servo kaj kreu tie novan temon kun la nomo Flugkompanioj:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Abonu ĉi tiun temon kaj indiku la poŝtelefonnumeron al kiu estos senditaj SMSaj sciigoj:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

Kreu tabelon en DynamoDB

Por konservi la krudajn datumojn de ilia flugo de airline_tickets, ni kreu tabelon en DynamoDB kun la sama nomo. Ni uzos record_id kiel la ĉefa ŝlosilo:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

Kreante lambda funkcio-kolektilo

Ni kreu lambdan funkcion nomatan Kolektanto, kies tasko estos baloti la fluon de airline_tickets kaj, se novaj rekordoj troviĝas tie, enigu ĉi tiujn rekordojn en la tabelon de DynamoDB. Evidente, krom la defaŭltaj rajtoj, ĉi tiu lambda devas havi legan aliron al la datumfluo de Kinesis kaj skribaliron al DynamoDB.

Kreante IAM-rolon por la kolektanta lambda funkcio
Unue, ni kreu novan IAM-rolon por la lambda nomita Lambda-TicketsProcessingRole:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Por la prova ekzemplo, la antaŭ-agordita AmazonKinesisReadOnlyAccess kaj AmazonDynamoDBFullAccess politikoj estas sufiĉe taŭgaj, kiel montrite en la bildo sube:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

Ĉi tiu lambda devus esti lanĉita per ellasilo de Kinesis kiam novaj eniroj eniras la airline_stream, do ni devas aldoni novan ellasilon:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Restas nur enigi la kodon kaj konservi la lambdon.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Kreante lambda-funkcian sciiganton

La dua lambda funkcio, kiu kontrolos la duan fluon (special_stream) kaj sendos sciigon al SNS, estas kreita en simila maniero. Tial ĉi tiu lambdo devas havi aliron por legi de Kinesis kaj sendi mesaĝojn al difinita SNS-temo, kiu tiam estos sendita de la SNS-servo al ĉiuj abonantoj de ĉi tiu temo (retpoŝto, SMS, ktp.).

Kreante IAM-rolon
Unue, ni kreas la IAM-rolon Lambda-KinesisAlarm por ĉi tiu lambdo, kaj poste asignas ĉi tiun rolon al la alarm_notifier lambda kreita:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

Ĉi tiu lambdo devus funkcii sur ellasilo por novaj rekordoj eniri la special_stream, do vi devas agordi la ellasilon en la sama maniero kiel ni faris por la Kolektanto lambda.

Por faciligi la agordon de ĉi tiu lambda, ni enkonduku novan mediovariablon - TOPIC_ARN, kie ni metas la ANR (Amazon Recourse Names) de la Flugkompaniaj temo:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Kaj enigu la lambdan kodon, ĝi tute ne estas komplika:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Ŝajnas, ke ĉi tie finiĝas la manlibro-sistema agordo. Restas nur provi kaj certigi, ke ni agordis ĉion ĝuste.

Deploji el Terraform-kodo

Bezonata preparo

Terraform estas tre oportuna malfermfonta ilo por disfaldi infrastrukturon el kodo. Ĝi havas sian propran sintakson, kiu estas facile lernebla kaj havas multajn ekzemplojn pri kiel kaj kion disfaldi. La Atom-redaktilo aŭ Visual Studio Code havas multajn oportunajn kromaĵojn, kiuj faciligas labori kun Terraform.

Vi povas elŝuti la distribuon de ĉi tie. Detala analizo de ĉiuj Terraform-kapabloj estas preter la amplekso de ĉi tiu artikolo, do ni limigos nin al la ĉefaj punktoj.

Kiel komenci

La plena kodo de la projekto estas en mia deponejo. Ni klonas la deponejon al ni mem. Antaŭ ol komenci, vi devas certigi, ke vi havas AWS CLI instalita kaj agordita, ĉar... Terraform serĉos akreditaĵojn en la ~/.aws/credentials dosiero.

Bona praktiko estas ruli la planan komandon antaŭ deploji la tutan infrastrukturon por vidi kion Terraform nuntempe kreas por ni en la nubo:

terraform.exe plan

Oni petos vin enigi telefonnumeron al kiu sendi sciigojn. Ne necesas eniri ĝin en ĉi tiu etapo.

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Analizinte la operacian planon de la programo, ni povas komenci krei rimedojn:

terraform.exe apply

Post sendi ĉi tiun komandon, oni denove petos vin enigi telefonnumeron; marku "jes" kiam demando pri efektive plenumi la agojn montriĝas. Ĉi tio permesos al vi agordi la tutan infrastrukturon, efektivigi la tutan necesan agordon de EC2, disfaldi lambda-funkciojn ktp.

Post kiam ĉiuj rimedoj estis sukcese kreitaj per la Terraform-kodo, vi devas eniri la detalojn de la aplikaĵo Kinesis Analytics (bedaŭrinde, mi ne trovis kiel fari tion rekte de la kodo).

Lanĉu la aplikaĵon:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Post ĉi tio, vi devas eksplicite agordi la en-aplikan fluonomon elektante el la fallisto:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Nun ĉio estas preta por iri.

Provante la aplikaĵon

Sendepende de kiel vi deplojis la sistemon, permane aŭ per Terraform-kodo, ĝi funkcios same.

Ni ensalutas per SSH al la virtuala maŝino EC2 kie Kinesis Agent estas instalita kaj rulas la skripton api_caller.py

sudo ./api_caller.py TOKEN

Vi nur devas atendi SMS al via numero:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
SMS - mesaĝo alvenas al la telefono en preskaŭ 1 minuto:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco
Restas vidi ĉu la rekordoj estis konservitaj en la datumbazo de DynamoDB por posta pli detala analizo. La tabelo airline_tickets enhavas proksimume la sekvajn datumojn:

Aviasales API-integriĝo kun Amazon Kinesis kaj senservila simpleco

konkludo

En la kurso de la farita laboro, interreta datumtraktadsistemo estis konstruita surbaze de Amazon Kinesis. Opcioj por uzi la Kinesis Agent kune kun Kinesis Data Streams kaj realtempa analizo Kinesis Analytics uzante SQL-komandojn, same kiel la interago de Amazon Kinesis kun aliaj AWS-servoj estis pripensitaj.

Ni deplojis la ĉi-supran sistemon en du manieroj: sufiĉe longa manlibro kaj rapida el la Terraform-kodo.

Ĉiuj projektaj fontkodo disponeblas en mia GitHub-deponejo, mi sugestas vin familiarigi vin kun ĝi.

Mi ĝojas diskuti la artikolon, mi antaŭĝojas viajn komentojn. Mi esperas konstruan kritikon.

Mi deziras al vi sukceson!

fonto: www.habr.com

Aldoni komenton