Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

Hej Habr!

Volite li letenje avionima? Volim to, ali sam se tokom samoizolacije zaljubio i u analizu podataka o avionskim kartama sa jednog poznatog resursa - Aviasales.

Danas ćemo analizirati rad Amazon Kinesis-a, izgraditi sistem za striming sa analitikom u realnom vremenu, instalirati Amazon DynamoDB NoSQL bazu podataka kao glavno skladište podataka i postaviti SMS obaveštenja za zanimljive karte.

Svi detalji su ispod reza! Idi!

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

Uvod

Za primjer, potreban nam je pristup Aviasales API. Pristup mu je besplatan i bez ograničenja; potrebno je samo da se registrujete u odeljku „Programeri“ da biste dobili svoj API token za pristup podacima.

Glavna svrha ovog članka je dati opće razumijevanje upotrebe strujanja informacija u AWS-u; uzimamo u obzir da podaci koje vraća korišteni API nisu striktno ažurirani i prenose se iz predmemorije, koja je formirana na osnovu pretraga korisnika sajtova Aviasales.ru i Jetradar.com u poslednjih 48 sati.

Kinesis-agent, instaliran na mašini za proizvodnju, primljen preko API-ja, automatski će analizirati i prenijeti podatke u željeni stream putem Kinesis Data Analytics. Sirova verzija ovog streama će biti napisana direktno u prodavnicu. Skladište sirovih podataka raspoređeno u DynamoDB omogućit će dublju analizu ulaznica putem BI alata, kao što je AWS Quick Sight.

Razmotrit ćemo dvije opcije za postavljanje cjelokupne infrastrukture:

  • Ručno - preko AWS upravljačke konzole;
  • Infrastruktura iz Terraform koda je za lijene automatore;

Arhitektura razvijenog sistema

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Korištene komponente:

  • Aviasales API — podaci koje vrati ovaj API će se koristiti za sav naredni rad;
  • EC2 Instanca proizvođača — obična virtuelna mašina u oblaku na kojoj će se generisati ulazni tok podataka:
    • Kinesis Agent je Java aplikacija instalirana lokalno na mašini koja pruža jednostavan način za prikupljanje i slanje podataka u Kinesis (Kinesis Data Streams ili Kinesis Firehose). Agent stalno prati skup datoteka u navedenim direktorijima i šalje nove podatke u Kinesis;
    • API Caller Script — Python skripta koja postavlja zahtjeve API-ju i stavlja odgovor u mapu koju nadgleda Kinesis Agent;
  • Kinesis tokovi podataka — usluga strimovanja podataka u realnom vremenu sa širokim mogućnostima skaliranja;
  • Kinesis Analytics je usluga bez servera koja pojednostavljuje analizu striming podataka u realnom vremenu. Amazon Kinesis Data Analytics konfigurira resurse aplikacije i automatski skalira za rukovanje bilo kojom količinom dolaznih podataka;
  • AWS Lambda — usluga koja vam omogućava da pokrenete kod bez pravljenja rezervnih kopija ili podešavanja servera. Sva računarska snaga se automatski skalira za svaki poziv;
  • Amazon DynamoDB - Baza podataka parova ključ/vrijednost i dokumenata koja pruža kašnjenje manje od 10 milisekundi kada se radi na bilo kojoj skali. Kada koristite DynamoDB, ne morate obezbjeđivati, krpiti ili upravljati serverima. DynamoDB automatski skalira tabele kako bi prilagodio količinu dostupnih resursa i održao visoke performanse. Nije potrebna sistemska administracija;
  • Amazon SNS - potpuno upravljana usluga za slanje poruka po modelu izdavač-pretplatnik (Pub/Sub), sa kojom možete izolovati mikroservise, distribuirane sisteme i aplikacije bez servera. SNS se može koristiti za slanje informacija krajnjim korisnicima putem mobilnih push obavijesti, SMS poruka i e-pošte.

Inicijalna obuka

Da bih emulirao protok podataka, odlučio sam da koristim informacije o avionskim kartama koje je vratio Aviasales API. IN dokumentaciju prilično opsežna lista različitih metoda, uzmimo jednu od njih - "Mjesečni kalendar cijena", koji vraća cijene za svaki dan u mjesecu, grupirane po broju transfera. Ako u zahtjevu ne navedete mjesec traženja, podaci će biti vraćeni za mjesec koji slijedi nakon tekućeg.

Dakle, hajde da se registrujemo i dobijemo naš token.

Primjer zahtjeva je u nastavku:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Gornji način primanja podataka od API-ja navođenjem tokena u zahtjevu će funkcionirati, ali ja radije propuštam pristupni token kroz zaglavlje, pa ćemo ovaj metod koristiti u skripti api_caller.py.

Primjer odgovora:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Primjer API odgovora iznad pokazuje kartu od Sankt Peterburga do Phuk... Oh, kakav san...
Pošto sam ja iz Kazanja, a Phuket je sada „samo san“, hajde da potražimo karte od Sankt Peterburga do Kazanja.

Pretpostavlja se da već imate AWS nalog. Posebno bih odmah skrenuo pažnju na činjenicu da Kinesis i slanje obavještenja putem SMS-a nisu uključeni u godišnji Besplatan nivo (besplatno korištenje). Ali čak i uprkos tome, sa par dolara na umu, sasvim je moguće izgraditi predloženi sistem i igrati se s njim. I, naravno, ne zaboravite izbrisati sve resurse nakon što više nisu potrebni.

Na sreću, DynamoDb i lambda funkcije će biti besplatne za nas ako ispunimo naše mjesečne besplatne limite. Na primjer, za DynamoDB: 25 GB prostora za pohranu, 25 WCU/RCU i 100 miliona upita. I milion poziva lambda funkcija mjesečno.

Ručna implementacija sistema

Postavljanje Kinesis Data Streams

Idemo na uslugu Kinesis Data Streams i kreiramo dva nova toka, po jedan dio za svaki.

Šta je krhotina?
Shard je osnovna jedinica za prijenos podataka Amazon Kinesis toka. Jedan segment omogućava prijenos ulaznih podataka brzinom od 1 MB/s i prijenos izlaznih podataka brzinom od 2 MB/s. Jedan segment podržava do 1000 PUT unosa u sekundi. Prilikom kreiranja toka podataka potrebno je navesti potreban broj segmenata. Na primjer, možete kreirati tok podataka sa dva segmenta. Ovaj tok podataka će omogućiti prijenos ulaznih podataka brzinom od 2 MB/s i prijenos izlaznih podataka brzinom od 4 MB/s, podržavajući do 2000 PUT zapisa u sekundi.

Što je više fragmenata u vašem streamu, veća je njegova propusnost. U principu, ovako se skaliraju tokovi - dodavanjem krhotina. Ali što više komadića imate, to je veća cijena. Svaki dio košta 1,5 centi po satu i dodatnih 1.4 centa za svaki milion PUT jedinica korisnog tereta.

Kreirajmo novi stream sa imenom airline_tickets, 1 krhotina će mu biti dovoljna:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Sada napravimo drugu nit sa imenom special_stream:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

Postavljanje proizvođača

Za analizu zadatka dovoljno je koristiti običnu EC2 instancu kao proizvođač podataka. To ne mora biti moćna, skupa virtuelna mašina; spot t2.micro će biti u redu.

Važna napomena: na primjer, trebali biste koristiti image - Amazon Linux AMI 2018.03.0, ima manje postavki za brzo pokretanje Kinesis Agenta.

Idite na uslugu EC2, kreirajte novu virtuelnu mašinu, izaberite željeni AMI sa tipom t2.micro, koji je uključen u besplatni nivo:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Da bi novostvorena virtuelna mašina mogla da komunicira sa uslugom Kinesis, mora da ima prava za to. Najbolji način da to učinite je da dodijelite IAM ulogu. Stoga, na ekranu Korak 3: Konfiguracija detalja instance, trebate odabrati Kreirajte novu IAM ulogu:

Kreiranje IAM uloge za EC2
Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
U prozoru koji se otvori odaberite da kreiramo novu ulogu za EC2 i idite na odjeljak Dozvole:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Koristeći primjer obuke, ne moramo ulaziti u sve zamršenosti granularne konfiguracije prava na resurse, pa ćemo odabrati politike koje je Amazon unaprijed konfigurirao: AmazonKinesisFullAccess i CloudWatchFullAccess.

Dajmo neko smisleno ime za ovu ulogu, na primjer: EC2-KinesisStreams-FullAccess. Rezultat bi trebao biti isti kao što je prikazano na slici ispod:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Nakon kreiranja ove nove uloge, ne zaboravite je priložiti kreiranoj instanci virtuelne mašine:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Ne mijenjamo ništa drugo na ovom ekranu i prelazimo na sljedeće prozore.

Postavke tvrdog diska se mogu ostaviti kao zadane, kao i oznake (iako je dobra praksa koristiti oznake, barem dajte instanci ime i označite okruženje).

Sada smo na kartici Korak 6: Konfigurišite bezbednosnu grupu, gde morate da kreirate novu ili navedete postojeću bezbednosnu grupu, koja vam omogućava da se povežete preko ssh-a (port 22) na instancu. Odaberite Izvor -> Moj IP tamo i možete pokrenuti instancu.

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Čim se prebaci u status pokretanja, možete pokušati da se povežete na njega putem ssh-a.

Da biste mogli raditi sa Kinesis Agentom, nakon uspješnog povezivanja na mašinu, morate unijeti sljedeće naredbe u terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Kreirajmo folder za spremanje API odgovora:

sudo mkdir /var/log/airline_tickets

Prije pokretanja agenta, morate konfigurirati njegovu konfiguraciju:

sudo vim /etc/aws-kinesis/agent.json

Sadržaj fajla agent.json bi trebao izgledati ovako:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kao što se može vidjeti iz konfiguracijske datoteke, agent će pratiti datoteke sa ekstenzijom .log u direktoriju /var/log/airline_tickets/, analizirati ih i prenijeti u tok airline_tickets.

Ponovo pokrećemo servis i uvjeravamo se da je pokrenut i radi:

sudo service aws-kinesis-agent restart

Sada preuzmimo Python skriptu koja će tražiti podatke od API-ja:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skripta api_caller.py traži podatke od Aviasales-a i sprema primljeni odgovor u direktorij koji Kinesis agent skenira. Implementacija ove skripte je sasvim standardna, postoji klasa TicketsApi, koja vam omogućava da asinhrono povučete API. Ovoj klasi prosljeđujemo zaglavlje sa tokenom i tražimo parametre:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Da testiramo ispravne postavke i funkcionalnost agenta, testirajmo skriptu api_caller.py:

sudo ./api_caller.py TOKEN

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
I gledamo rezultat rada u zapisnicima agenta i na kartici Monitoring u toku podataka airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Kao što vidite, sve radi i Kinesis Agent uspješno šalje podatke u stream. Sada konfigurirajmo potrošača.

Postavljanje Kinesis Data Analytics

Prijeđimo na središnju komponentu cijelog sistema - kreirajte novu aplikaciju u Kinesis Data Analytics pod nazivom kinesis_analytics_airlines_app:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Kinesis Data Analytics omogućava vam da izvršite analizu podataka u realnom vremenu iz Kinesis Streams koristeći SQL jezik. To je usluga potpuno automatskog skaliranja (za razliku od Kinesis Streams) koja:

  1. omogućava vam da kreirate nove tokove (Output Stream) na osnovu zahteva za izvornim podacima;
  2. pruža stream s greškama koje su se dogodile dok su aplikacije bile pokrenute (Error Stream);
  3. može automatski odrediti šemu ulaznih podataka (može se ručno redefinirati ako je potrebno).

Ovo nije jeftina usluga - 0.11 USD po satu rada, pa je pažljivo koristite i obrišite kada završite.

Povežimo aplikaciju sa izvorom podataka:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Odaberite stream na koji ćemo se povezati (airline_tickets):

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Zatim morate priložiti novu IAM ulogu tako da aplikacija može čitati iz streama i pisati u stream. Da biste to učinili, dovoljno je ne mijenjati ništa u bloku Dozvole za pristup:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Sada zatražimo otkrivanje šeme podataka u toku; da biste to učinili, kliknite na dugme „Otkrij šemu“. Kao rezultat toga, uloga IAM-a će biti ažurirana (kreirana) i otkrivanje sheme će se pokrenuti iz podataka koji su već stigli u stream:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Sada morate otići u SQL editor. Kada kliknete na ovo dugme, pojaviće se prozor u kojem se traži da pokrenete aplikaciju - izaberite šta želite da pokrenete:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Umetnite sljedeći jednostavan upit u prozor SQL editora i kliknite na Spremi i pokreni SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

U relacionim bazama podataka radite sa tabelama koristeći INSERT izraze za dodavanje zapisa i SELECT naredbu za upit podataka. U Amazon Kinesis Data Analytics radite sa tokovima (STREAM) i pumpama (PUMP)—zahtjevi za kontinuirano umetanje koji ubacuju podatke iz jednog toka u aplikaciji u drugi tok.

Gore prikazan SQL upit traži karte Aeroflota po cijeni ispod pet hiljada rubalja. Svi zapisi koji ispunjavaju ove uslove bit će smješteni u DESTINATION_SQL_STREAM tok.

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
U bloku Destination izaberite stream special_stream, a na padajućoj listi naziv toka u aplikaciji DESTINATION_SQL_STREAM:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Rezultat svih manipulacija trebao bi biti nešto slično slici ispod:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

Kreiranje i pretplata na SNS temu

Idite na Simple Notification Service i tamo kreirajte novu temu pod nazivom Airlines:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Pretplatite se na ovu temu i naznačite broj mobilnog telefona na koji će se slati SMS obavještenja:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

Kreirajte tabelu u DynamoDB

Da pohranimo neobrađene podatke iz njihovog toka airline_tickets, napravimo tabelu u DynamoDB sa istim imenom. Koristit ćemo record_id kao primarni ključ:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

Kreiranje kolektora lambda funkcija

Kreirajmo lambda funkciju nazvanu Collector, čiji će zadatak biti da ispita tok airline_tickets i, ako se tamo pronađu novi zapisi, ubaci ove zapise u DynamoDB tabelu. Očigledno, pored zadanih prava, ova lambda mora imati pristup za čitanje Kinesis toka podataka i pristup za pisanje u DynamoDB.

Kreiranje IAM uloge za kolektorsku lambda funkciju
Prvo, kreirajmo novu IAM ulogu za lambda pod nazivom Lambda-TicketsProcessingRole:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Za primjer testa, unaprijed konfigurirane politike AmazonKinesisReadOnlyAccess i AmazonDynamoDBFullAccess su sasvim prikladne, kao što je prikazano na slici ispod:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

Ova lambda bi trebala biti pokrenuta okidačem iz Kinesisa kada novi unosi uđu u airline_stream, tako da moramo dodati novi okidač:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Ostaje samo da ubacite kod i sačuvate lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Kreiranje notifiera lambda funkcije

Druga lambda funkcija, koja će pratiti drugi stream (special_stream) i slati obavještenje SNS-u, kreirana je na sličan način. Dakle, ova lambda mora imati pristup za čitanje iz Kinesisa i slanje poruka na datu SNS temu, koje će zatim SNS servis poslati svim pretplatnicima ove teme (e-mail, SMS, itd.).

Kreiranje IAM uloge
Prvo kreiramo IAM ulogu Lambda-KinesisAlarm za ovu lambdu, a zatim dodjeljujemo ovu ulogu lambdi alarm_notifier koja se kreira:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

Ova lambda bi trebala raditi na okidaču za nove zapise za ulazak u special_stream, tako da trebate konfigurirati okidač na isti način kao što smo radili za Collector lambda.

Da bismo olakšali konfiguraciju ove lambda, uvedimo novu varijablu okruženja - TOPIC_ARN, gdje postavljamo ANR (Amazon Recourse Names) teme Airlines:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
I ubacite lambda kod, nije nimalo komplikovano:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Čini se da je tu završena ručna konfiguracija sistema. Ostaje samo da testiramo i uvjerimo se da smo sve ispravno konfigurirali.

Postavite iz Terraform koda

Potrebna priprema

Terraform je vrlo zgodan alat otvorenog koda za postavljanje infrastrukture iz koda. Ima vlastitu sintaksu koju je lako naučiti i ima mnogo primjera kako i šta implementirati. Atom editor ili Visual Studio Code ima mnogo praktičnih dodataka koji olakšavaju rad sa Terraformom.

Možete preuzeti distribuciju odavde. Detaljna analiza svih mogućnosti Terraforma je izvan okvira ovog članka, pa ćemo se ograničiti na glavne tačke.

Kako započeti

Potpuni kod projekta je u mom spremištu. Mi kloniramo spremište za sebe. Prije nego što počnete, morate biti sigurni da imate instaliran i konfiguriran AWS CLI, jer... Terraform će tražiti vjerodajnice u ~/.aws/credentials datoteci.

Dobra praksa je da pokrenete naredbu plana prije postavljanja cijele infrastrukture kako biste vidjeli šta Terraform trenutno stvara za nas u oblaku:

terraform.exe plan

Od vas će se tražiti da unesete broj telefona na koji ćete slati obavještenja. U ovoj fazi nije potrebno unositi ga.

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Nakon analize plana rada programa, možemo započeti kreiranje resursa:

terraform.exe apply

Nakon slanja ove komande, od vas će se ponovo tražiti da unesete broj telefona; birajte „da“ kada se prikaže pitanje o stvarnom izvođenju radnji. To će vam omogućiti da postavite cjelokupnu infrastrukturu, izvršite svu potrebnu konfiguraciju EC2, implementirate lambda funkcije, itd.

Nakon što su svi resursi uspješno kreirani putem Terraform koda, morate ući u detalje aplikacije Kinesis Analytics (nažalost, nisam pronašao kako to učiniti direktno iz koda).

Pokrenite aplikaciju:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Nakon ovoga, morate eksplicitno postaviti naziv toka u aplikaciji odabirom sa padajuće liste:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Sada je sve spremno za rad.

Testiranje aplikacije

Bez obzira na to kako ste implementirali sistem, ručno ili putem Terraform koda, on će raditi isto.

Prijavljujemo se preko SSH-a na EC2 virtuelnu mašinu na kojoj je instaliran Kinesis Agent i pokrećemo skriptu api_caller.py

sudo ./api_caller.py TOKEN

Sve što treba da uradite je da sačekate SMS na vaš broj:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
SMS - poruka stiže na telefon za skoro 1 minut:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Ostaje da se vidi da li su zapisi sačuvani u DynamoDB bazi podataka radi naknadne, detaljnije analize. Tabela airline_tickets sadrži otprilike sljedeće podatke:

Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera

zaključak

U toku obavljenog posla izgrađen je onlajn sistem za obradu podataka baziran na Amazon Kinesis. Razmotrene su opcije za korištenje Kinesis Agenta u kombinaciji s Kinesis Data Streams i analitikom u realnom vremenu Kinesis Analytics korištenjem SQL komandi, kao i interakcija Amazon Kinesisa s drugim AWS servisima.

Gornji sistem smo implementirali na dva načina: prilično dugačak ručni i brzi iz Terraform koda.

Svi izvorni kodovi projekta su dostupni u mom GitHub spremištu, predlažem da se upoznate s tim.

Drago mi je da raspravljam o članku, radujem se vašim komentarima. Nadam se konstruktivnoj kritici.

Želim vam uspeh!

izvor: www.habr.com

Dodajte komentar