Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

Tere Habr!

Kas teile meeldivad lennukid? Ma armastan seda, kuid eneseisolatsiooni ajal armusin ka lennupiletite andmete analüüsimisse ühest tuntud ressursist - Aviasalesist.

Täna analüüsime Amazon Kinesise tööd, ehitame reaalajas analüütikaga voogedastussüsteemi, installime põhiliseks andmesalvestuseks Amazon DynamoDB NoSQL andmebaasi ning seadistame huvitavate piletite jaoks SMS-teavitused.

Kõik detailid on lõike all! Mine!

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

Sissejuhatus

Näiteks vajame juurdepääsu Aviasales API. Juurdepääs sellele on tasuta ja piiranguteta; andmetele juurdepääsu saamiseks peate lihtsalt registreeruma jaotises Arendajad.

Selle artikli põhieesmärk on anda üldine arusaam teabe voogesituse kasutamisest AWS-is; võtame arvesse, et kasutatava API poolt tagastatud andmed ei ole rangelt ajakohased ja edastatakse vahemälust, mis on moodustatud saitide Aviasales.ru ja Jetradar.com kasutajate viimase 48 tunni otsingute põhjal.

Tootmismasinale installitud, API kaudu saadud Kinesis-agent parsib automaatselt ja edastab andmed Kinesis Data Analyticsi kaudu soovitud voogu. Selle voo töötlemata versioon kirjutatakse otse poodi. DynamoDB-s juurutatud töötlemata andmete salvestamine võimaldab BI-tööriistade (nt AWS Quick Sight) kaudu piletite sügavamat analüüsi.

Kaalume kogu infrastruktuuri juurutamiseks kahte võimalust:

  • Käsiraamat - AWS-i halduskonsooli kaudu;
  • Terraformi koodi infrastruktuur on laiskadele automatisaatoritele;

Arendatava süsteemi arhitektuur

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Kasutatud komponendid:

  • Aviasales API — selle API poolt tagastatud andmeid kasutatakse kogu järgneva töö jaoks;
  • EC2 tootja eksemplar — tavaline pilves olev virtuaalmasin, milles sisendandmevoog genereeritakse:
    • Kineesi agent on masinasse lokaalselt installitud Java-rakendus, mis pakub lihtsat viisi andmete kogumiseks ja saatmiseks Kinesisesse (Kinesis Data Streams või Kinesis Firehose). Agent jälgib pidevalt failide komplekti määratud kataloogides ja saadab Kinesisele uusi andmeid;
    • API helistaja skript — Pythoni skript, mis esitab API-le päringuid ja paneb vastuse kausta, mida jälgib Kinesis Agent;
  • Kinesise andmevood — laia skaleerimisvõimalusega reaalajas andmevoogedastusteenus;
  • Kinesis Analytics on serverita teenus, mis lihtsustab voogesituse andmete reaalajas analüüsi. Amazon Kinesis Data Analytics konfigureerib rakenduse ressursid ja skaleerib automaatselt, et käsitleda mis tahes sissetulevate andmete mahtu;
  • AWS Lambda — teenus, mis võimaldab käivitada koodi ilma varundamiseta või servereid seadistamata. Kogu arvutusvõimsus skaleeritakse automaatselt iga kõne jaoks;
  • Amazon DynamoDB - Võtme-väärtuste paaride ja dokumentide andmebaas, mis tagab mis tahes skaalal käitamise latentsuse alla 10 millisekundi. DynamoDB kasutamisel ei pea te servereid varustama, parandama ega haldama. DynamoDB skaleerib tabeleid automaatselt, et kohandada saadaolevate ressursside hulka ja säilitada kõrget jõudlust. Süsteemi administreerimine pole vajalik;
  • Amazon SNS - täielikult hallatav teenus sõnumite saatmiseks, kasutades väljaandja-abonendi (Pub/Sub) mudelit, mille abil saate eraldada mikroteenuseid, hajutatud süsteeme ja serverita rakendusi. SNS-i saab kasutada lõppkasutajatele teabe saatmiseks mobiili tõukemärguannete, SMS-sõnumite ja e-kirjade kaudu.

Esialgne koolitus

Andmevoo jäljendamiseks otsustasin kasutada Aviasales API tagastatud lennupiletiteavet. IN dokumentatsioon üsna ulatuslik loetelu erinevatest meetoditest, võtame neist ühe - "Kuuhinnakalender", mis tagastab hinnad iga kuu päeva kohta, mis on rühmitatud ülekannete arvu järgi. Kui te päringus otsingukuud ei täpsusta, tagastatakse andmed jooksvale kuule järgneva kuu kohta.

Niisiis, registreerume ja hankime oma märgi.

Taotluse näide on allpool:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ülaltoodud meetod API-lt andmete vastuvõtmiseks, määrates päringus loa, töötab, kuid eelistan juurdepääsuluba päise kaudu edastada, seega kasutame seda meetodit skriptis api_caller.py.

Vastuse näide:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Ülaltoodud API vastuse näidis näitab piletit Peterburist Phuki... Oh, milline unistus...
Kuna olen pärit Kaasanist ja Phuket on nüüd “ainult unistus”, siis otsime pileteid Peterburist Kaasanisse.

See eeldab, et teil on juba AWS-i konto. Eraldi juhin koheselt tähelepanu asjaolule, et Kinesis ja SMS-iga teadete saatmine ei sisaldu aastas. Tasuta tase (tasuta kasutamine). Kuid isegi sellele vaatamata on paari dollariga silmas pidades täiesti võimalik kavandatud süsteem üles ehitada ja sellega mängida. Ja muidugi ärge unustage kustutada kõiki ressursse pärast seda, kui neid enam vaja pole.

Õnneks on DynamoDb ja lambda funktsioonid meie jaoks tasuta, kui täidame oma igakuised tasuta limiidid. Näiteks DynamoDB jaoks: 25 GB salvestusruumi, 25 WCU/RCU ja 100 miljonit päringut. Ja miljon lambda-funktsiooni kõnet kuus.

Süsteemi käsitsi juurutamine

Kinesise andmevoogude seadistamine

Liigume Kinesise andmevoogude teenusesse ja loome kaks uut voogu, kummagi jaoks üks killuke.

Mis on kild?
Kild on Amazon Kinesise voo põhiline andmeedastusseade. Üks segment tagab sisendandmete edastamise kiirusega 1 MB/s ja väljundandmete edastamise kiirusega 2 MB/s. Üks segment toetab kuni 1000 PUT-kirjet sekundis. Andmevoo loomisel peate määrama vajaliku arvu segmente. Näiteks saate luua kahe segmendiga andmevoo. See andmevoog tagab sisendandmete edastamise kiirusega 2 MB/s ja väljundandmeedastuse kiirusega 4 MB/s, toetades kuni 2000 PUT-kirjet sekundis.

Mida rohkem kilde teie voos on, seda suurem on selle läbilaskevõime. Põhimõtteliselt skaleeritakse vooge nii – kildude lisamisega. Kuid mida rohkem kilde teil on, seda kõrgem on hind. Iga kild maksab 1,5 senti tunnis ja lisaks 1.4 senti iga miljoni PUT-i kasuliku ühiku kohta.

Loome uue nimega voo lennupiletid, talle piisab 1 killust:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Nüüd loome veel ühe nimega lõime eriline_voog:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

Tootja seadistamine

Ülesande analüüsimiseks piisab, kui kasutada andmetootjana tavalist EC2 eksemplari. See ei pea olema võimas ja kallis virtuaalmasin; kohapealne t2.micro sobib suurepäraselt.

Oluline märkus: näiteks peaksite kasutama pilti - Amazon Linux AMI 2018.03.0, sellel on vähem sätteid Kinesis Agenti kiireks käivitamiseks.

Minge teenusesse EC2, looge uus virtuaalmasin, valige soovitud AMI tüübiga t2.micro, mis sisaldub tasuta astmes:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Selleks, et vastloodud virtuaalmasin saaks Kinesise teenusega suhelda, tuleb talle anda selleks õigused. Parim viis selleks on määrata IAM-i roll. Seetõttu peaksite ekraanil Samm 3: Konfigureeri eksemplari üksikasjad valima Looge uus IAM-i roll:

IAM-i rolli loomine EC2 jaoks
Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Avanevas aknas valige, et loome EC2 jaoks uut rolli, ja minge jaotisse Load:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Kasutades koolitusnäidet, ei pea me süvenema ressursiõiguste üksikasjaliku konfigureerimise keerukustesse, seega valime Amazoni eelkonfigureeritud poliitikad: AmazonKinesisFullAccess ja CloudWatchFullAccess.

Anname sellele rollile tähendusrikka nime, näiteks: EC2-KinesisStreams-FullAccess. Tulemus peaks olema sama, mis on näidatud alloleval pildil:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Pärast selle uue rolli loomist ärge unustage lisada seda loodud virtuaalmasina eksemplarile:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Me ei muuda sellel ekraanil midagi muud ja liigume edasi järgmistesse akendesse.

Kõvaketta sätted saab jätta vaikimisi, nagu ka sildid (kuigi hea tava on siltide kasutamine, anna eksemplarile vähemalt nimi ja märgi keskkond).

Nüüd oleme vahekaardil Samm 6: Configure Security Group, kus peate looma uue või määrama olemasoleva turvarühma, mis võimaldab teil eksemplariga ühenduse luua ssh-i (port 22) kaudu. Valige seal Allikas -> Minu IP ja saate eksemplari käivitada.

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Niipea, kui see lülitub tööolekusse, võite proovida sellega ühenduse luua ssh-i kaudu.

Kinesis Agentiga töötamiseks peate pärast masinaga edukat ühendamist sisestama terminali järgmised käsud:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Loome API vastuste salvestamiseks kausta:

sudo mkdir /var/log/airline_tickets

Enne agendi käivitamist peate konfigureerima selle konfiguratsiooni:

sudo vim /etc/aws-kinesis/agent.json

Faili agent.json sisu peaks välja nägema järgmine:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Nagu konfiguratsioonifailist näha, jälgib agent .log-laiendiga faile kataloogis /var/log/airline_tickets/, sõelub neid ja edastab lennuliini_piletite voogu.

Taaskäivitame teenuse ja veendume, et see töötab ja töötab:

sudo service aws-kinesis-agent restart

Nüüd laadime alla Pythoni skripti, mis küsib API-lt andmeid:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skript api_caller.py küsib Aviasalesilt andmeid ja salvestab saadud vastuse kataloogi, mida Kinesise agent skannib. Selle skripti rakendamine on üsna standardne, seal on TicketsApi klass, see võimaldab teil API-d asünkroonselt tõmmata. Edastame sellele klassile märgiga päise ja taotleme parameetreid:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Agendi õigete seadete ja funktsionaalsuse testimiseks testime skripti api_caller.py:

sudo ./api_caller.py TOKEN

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Ja me vaatame töö tulemust agendilogides ja andmevoo lennupiletite vahekaardil Jälgimine:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Nagu näete, kõik toimib ja Kinesis Agent saadab andmed edukalt voogu. Nüüd seadistame tarbija.

Kinesis Data Analyticsi seadistamine

Liigume edasi kogu süsteemi keskse komponendi juurde – looge Kinesis Data Analyticsis uus rakendus nimega kinesis_analytics_airlines_app:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Kinesis Data Analytics võimaldab teil teha Kinesis Streamsist reaalajas andmeanalüüsi, kasutades SQL-keelt. See on täielikult automaatse skaleerimise teenus (erinevalt Kinesis Streamsist), mis:

  1. võimaldab luua uusi vooge (Output Stream), mis põhinevad lähteandmete päringutel;
  2. pakub voogu rakenduste töötamise ajal ilmnenud vigadega (Error Stream);
  3. suudab automaatselt määrata sisendandmete skeemi (vajadusel saab seda käsitsi ümber defineerida).

See ei ole odav teenus – 0.11 USD töötunni kohta, nii et kasutage seda ettevaatlikult ja kustutage see, kui olete lõpetanud.

Ühendame rakenduse andmeallikaga:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Valige voog, millega ühendame (lennupiletid):

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Järgmiseks peate lisama uue IAM-i rolli, et rakendus saaks voost lugeda ja voogu kirjutada. Selleks piisab, kui mitte midagi muuta juurdepääsuõiguste plokis:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Nüüd taotleme voos oleva andmeskeemi avastamist; selleks klõpsake nuppu "Avasta skeem". Selle tulemusena värskendatakse IAM-i rolli (luuakse uus) ja voogu juba saabunud andmete põhjal käivitatakse skeemi tuvastamine:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Nüüd peate minema SQL-redaktorisse. Kui klõpsate sellel nupul, ilmub aken, mis palub teil rakendus käivitada – valige, mida soovite käivitada:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Sisestage SQL-redaktori aknasse järgmine lihtne päring ja klõpsake nuppu Salvesta ja käivita SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Relatsiooniandmebaasides töötate tabelitega, kasutades kirjete lisamiseks INSERT-lauset ja andmete päringute tegemiseks käsku SELECT. Rakenduses Amazon Kinesis Data Analytics töötate voogudega (STREAM-id) ja pumpadega (PUMP-idega) – pidevate sisestamistaotlustega, mis lisavad andmed rakenduse ühest voost teise voogu.

Ülaltoodud SQL-päring otsib Aerofloti pileteid hinnaga alla viie tuhande rubla. Kõik neile tingimustele vastavad kirjed paigutatakse voogu DESTINATION_SQL_STREAM.

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Valige plokis Sihtkoht voog special_stream ja ripploendist Rakendusesisese voo nimi DESTINATION_SQL_STREAM:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Kõikide manipulatsioonide tulemus peaks olema midagi sarnast alloleval pildil:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

SNS-i teema loomine ja tellimine

Minge lihtsa teavitusteenuse juurde ja looge seal uus teema nimega Lennufirmad:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Liituge selle teemaga ja märkige mobiiltelefoni number, millele SMS-teated saadetakse:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

Looge DynamoDB-s tabel

Lennupiletite voost toorandmete salvestamiseks loome DynamoDB-s sama nimega tabeli. Primaarvõtmena kasutame record_id:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

Lambda funktsioonikollektori loomine

Loome lambda-funktsiooni nimega Collector, mille ülesandeks saab olema lennupiletite voo küsitlus ja kui sealt leitakse uusi kirjeid, siis sisestada need kirjed DynamoDB tabelisse. Ilmselgelt peab sellel lambdal olema lisaks vaikeõigustele ka lugemisõigus Kinesise andmevoogu ja kirjutamisõigus DynamoDB-le.

IAM-rolli loomine kollektori lambda-funktsioonile
Kõigepealt loome lambda jaoks uue IAM-i rolli nimega Lambda-TicketsProcessingRole:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Testnäite jaoks on eelkonfigureeritud AmazonKinesisReadOnlyAccess ja AmazonDynamoDBFullAccess poliitikad üsna sobivad, nagu on näidatud alloleval pildil:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

See lambda peaks käivitama Kinesise päästiku, kui lennuliini_voogu sisenevad uued kirjed, seega peame lisama uue päästiku:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Jääb vaid kood sisestada ja lambda salvestada.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda funktsiooni märguande loomine

Teine lambda-funktsioon, mis jälgib teist voogu (special_stream) ja saadab teate SNS-ile, luuakse sarnaselt. Seetõttu peab sellel lambdal olema juurdepääs Kinesisest lugemiseks ja antud SNS-i teemale sõnumite saatmiseks, mis seejärel saadetakse SNS-teenuse poolt kõigile selle teema tellijatele (e-post, SMS jne).

IAM-rolli loomine
Esiteks loome selle lambda jaoks IAM-i rolli Lambda-KinesisAlarm ja seejärel määrame selle rolli loodavale alarm_notifier lambdale:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

See lambda peaks töötama päästikul, et uued kirjed siseneksid eri_voogu, seega peate päästiku konfigureerima samal viisil, nagu tegime koguja lambda puhul.

Selle lambda konfigureerimise hõlbustamiseks tutvustame uut keskkonnamuutujat - TOPIC_ARN, kuhu paigutame Airlinesi teema ANR (Amazon Resource Names):

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Ja sisestage lambda kood, see pole üldse keeruline:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Tundub, et siin on käsitsi süsteemi seadistamine lõpetatud. Jääb üle vaid testida ja veenduda, et oleme kõik õigesti seadistanud.

Juurutage Terraformi koodist

Nõutav ettevalmistus

Terraform on väga mugav avatud lähtekoodiga tööriist infrastruktuuri juurutamiseks koodist. Sellel on oma süntaks, mida on lihtne õppida ja millel on palju näiteid selle kohta, kuidas ja mida juurutada. Atomi redaktoril või Visual Studio Code'il on palju käepäraseid pistikprogramme, mis muudavad Terraformiga töötamise lihtsamaks.

Saate distributsiooni alla laadida siit. Terraformi kõigi võimaluste üksikasjalik analüüs ei kuulu selle artikli ulatusse, seega piirdume peamiste punktidega.

Kuidas alustada

Projekti täielik kood on minu hoidlas. Kloonime hoidla endale. Enne alustamist peate veenduma, et AWS CLI on installitud ja konfigureeritud, sest... Terraform otsib mandaate failist ~/.aws/credentials.

Hea tava on käivitada käsk plaan enne kogu infrastruktuuri juurutamist, et näha, mida Terraform praegu pilves loob:

terraform.exe plan

Teil palutakse sisestada telefoninumber, millele teateid saata. Selles etapis pole seda vaja sisestada.

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Pärast programmi tööplaani analüüsimist saame alustada ressursside loomisega:

terraform.exe apply

Pärast selle käsu saatmist palutakse teil uuesti sisestada telefoninumber; kui kuvatakse küsimus toimingute tegeliku sooritamise kohta, valige "jah". See võimaldab teil seadistada kogu infrastruktuuri, teha kõik vajalikud EC2 konfiguratsioonid, juurutada lambda funktsioone jne.

Kui kõik ressursid on Terraformi koodi kaudu edukalt loodud, peate minema Kinesis Analyticsi rakenduse üksikasjadesse (kahjuks ma ei leidnud, kuidas seda otse koodist teha).

Käivitage rakendus:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Pärast seda peate rakendusesisese voo nime selgesõnaliselt määrama, valides ripploendist:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Nüüd on kõik tööks valmis.

Rakenduse testimine

Olenemata sellest, kuidas te süsteemi juurutasite, kas käsitsi või Terraformi koodi kaudu, töötab see samamoodi.

Logime SSH kaudu sisse EC2 virtuaalmasinasse, kuhu Kinesis Agent on installitud ja käivitame skripti api_caller.py

sudo ./api_caller.py TOKEN

Kõik, mida pead tegema, on oodata SMS-i oma numbrile:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
SMS - sõnum saabub teie telefoni peaaegu 1 minutiga:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus
Jääb üle vaadata, kas kirjed salvestati DynamoDB andmebaasi hilisemaks üksikasjalikumaks analüüsiks. Lennupiletite tabel sisaldab ligikaudu järgmisi andmeid:

Aviasales API integratsioon Amazon Kinesisega ja serverivaba lihtsus

Järeldus

Tehtud töö käigus ehitati Amazon Kinesisel põhinev online andmetöötlussüsteem. Kaaluti võimalusi Kinesis Agenti kasutamiseks koos Kinesise andmevoogudega ja reaalajas analüüsiga Kinesis Analytics, kasutades SQL-käske, samuti Amazon Kinesise koostoimet teiste AWS-teenustega.

Juurutasime ülaltoodud süsteemi kahel viisil: üsna pika manuaalse ja kiire Terraformi koodi järgi.

Kogu projekti lähtekood on saadaval minu GitHubi hoidlas, soovitan teil sellega tutvuda.

Mul on hea meel artiklit arutada, ootan teie kommentaare. Loodan konstruktiivset kriitikat.

Soovin teile edu!

Allikas: www.habr.com

Lisa kommentaar