Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Hej Habr!

Volite li letjeti avionima? Volim to, ali tijekom samoizolacije sam se zaljubio i u analiziranje podataka o avio kartama jednog poznatog izvora - Aviasales.

Danas ćemo analizirati rad Amazon Kinesisa, izgraditi streaming sustav s analitikom u stvarnom vremenu, instalirati Amazon DynamoDB NoSQL bazu podataka kao glavnu pohranu podataka te postaviti SMS obavijesti za zanimljive tikete.

Svi detalji su ispod kroja! Ići!

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Uvod

Za primjer, trebamo pristup Aviasales API. Pristup mu je besplatan i bez ograničenja; samo se trebate registrirati u odjeljku "Programeri" da biste dobili svoj API token za pristup podacima.

Glavna svrha ovog članka je pružiti opće razumijevanje upotrebe strujanja informacija u AWS-u; uzimamo u obzir da podaci koje vraća korišteni API nisu strogo ažurni i prenose se iz predmemorije, što je formirana na temelju pretraživanja korisnika stranica Aviasales.ru i Jetradar.com u zadnjih 48 sati.

Kinesis-agent, instaliran na stroju za proizvodnju, primljen putem API-ja automatski će analizirati i prenijeti podatke u željeni tok putem Kinesis Data Analytics. Neobrađena verzija ovog streama bit će zapisana izravno u trgovinu. Pohrana neobrađenih podataka postavljena u DynamoDB omogućit će dublju analizu ulaznica putem BI alata, kao što je AWS Quick Sight.

Razmotrit ćemo dvije mogućnosti za implementaciju cijele infrastrukture:

  • Ručno - preko AWS Management Console;
  • Infrastruktura iz Terraform koda je za lijene automatiste;

Arhitektura razvijenog sustava

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Korištene komponente:

  • Aviasales API — podaci koje vraća ovaj API koristit će se za sav daljnji rad;
  • Instanca proizvođača EC2 — obični virtualni stroj u oblaku na kojem će se generirati ulazni tok podataka:
    • Kinesis agent je Java aplikacija instalirana lokalno na stroju koja omogućuje jednostavan način prikupljanja i slanja podataka u Kinesis (Kinesis Data Streams ili Kinesis Firehose). Agent stalno prati skup datoteka u navedenim direktorijima i šalje nove podatke Kinesisu;
    • API pozivatelj skripte — Python skripta koja šalje zahtjeve API-ju i stavlja odgovor u mapu koju nadzire Kinesis Agent;
  • Kinesis tokovi podataka — usluga strujanja podataka u stvarnom vremenu sa širokim mogućnostima skaliranja;
  • Analitika kineze je usluga bez poslužitelja koja pojednostavljuje analizu strujanja podataka u stvarnom vremenu. Amazon Kinesis Data Analytics konfigurira resurse aplikacije i automatski se skalira za obradu bilo koje količine dolaznih podataka;
  • AWS Lambda — usluga koja vam omogućuje pokretanje koda bez sigurnosne kopije ili postavljanja poslužitelja. Sva računalna snaga automatski se skalira za svaki poziv;
  • Amazon DynamoDB - Baza podataka parova ključ-vrijednost i dokumenata koja omogućuje kašnjenje manje od 10 milisekundi pri izvođenju na bilo kojoj razini. Kada koristite DynamoDB, ne trebate osiguravati, krpati niti upravljati bilo kojim poslužiteljem. DynamoDB automatski skalira tablice kako bi prilagodio količinu dostupnih resursa i održao visoku izvedbu. Nije potrebna administracija sustava;
  • Amazon SNS - potpuno upravljana usluga za slanje poruka po modelu izdavač-pretplatnik (Pub/Sub), s kojom možete izolirati mikroservise, distribuirane sustave i aplikacije bez poslužitelja. SNS se može koristiti za slanje informacija krajnjim korisnicima putem mobilnih push obavijesti, SMS poruka i e-pošte.

Početna obuka

Kako bih emulirao tijek podataka, odlučio sam upotrijebiti informacije o zrakoplovnim kartama koje je vratio Aviasales API. U dokumentacija prilično opsežan popis različitih metoda, uzmimo jednu od njih - "Mjesečni kalendar cijena", koji vraća cijene za svaki dan u mjesecu, grupirane prema broju prijenosa. Ako u zahtjevu ne navedete mjesec pretraživanja, podaci će biti vraćeni za mjesec koji slijedi nakon tekućeg.

Dakle, registrirajmo se i uzmimo svoj token.

Primjer zahtjeva je u nastavku:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Gornja metoda primanja podataka iz API-ja navođenjem tokena u zahtjevu će funkcionirati, ali ja radije prosljeđujem pristupni token kroz zaglavlje, pa ćemo ovu metodu koristiti u skripti api_caller.py.

Primjer odgovora:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Primjer gore navedenog API odgovora prikazuje kartu od St. Petersburga do Phuka... Oh, kakav san...
Budući da sam iz Kazana, a Phuket je sada "samo san", potražimo karte od St. Petersburga do Kazana.

Pretpostavlja se da već imate AWS račun. Posebno bih odmah skrenuo pozornost da Kinesis i slanje obavijesti putem SMS-a nisu uključeni u godišnji Besplatna razina (besplatna upotreba). Ali čak i unatoč tome, s nekoliko dolara na umu, sasvim je moguće izgraditi predloženi sustav i igrati se s njim. I, naravno, ne zaboravite izbrisati sve resurse nakon što više nisu potrebni.

Srećom, funkcije DynamoDb i lambda bit će besplatne za nas ako ispunimo svoja mjesečna besplatna ograničenja. Na primjer, za DynamoDB: 25 GB prostora za pohranu, 25 WCU/RCU i 100 milijuna upita. I milijun poziva lambda funkcija mjesečno.

Ručno postavljanje sustava

Postavljanje protoka Kinesis podataka

Idemo na uslugu Kinesis Data Streams i stvorimo dva nova toka, po jedan shard za svaki.

Što je shard?
Shard je osnovna jedinica prijenosa podataka Amazon Kinesis streama. Jedan segment osigurava ulazni prijenos podataka brzinom od 1 MB/s i izlazni prijenos podataka brzinom od 2 MB/s. Jedan segment podržava do 1000 PUT unosa u sekundi. Prilikom izrade podatkovnog toka potrebno je navesti potreban broj segmenata. Na primjer, možete stvoriti tok podataka s dva segmenta. Ovaj tok podataka omogućit će ulazni prijenos podataka pri 2 MB/s i izlazni prijenos podataka pri 4 MB/s, podržavajući do 2000 PUT zapisa u sekundi.

Što je više fragmenata u vašem streamu, veća je njegova propusnost. U principu se tokovi skaliraju - dodavanjem fragmenata. Ali što više fragmenata imate, to je viša cijena. Svaki shard košta 1,5 centi po satu i dodatnih 1.4 centa za svakih milijun PUT jedinica korisnog opterećenja.

Kreirajmo novi stream s imenom avio karte, 1 shard će mu biti dovoljan:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Kreirajmo sada drugu nit s tim imenom posebni_tok:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Postavljanje proizvođača

Za analizu zadatka dovoljno je koristiti običnu EC2 instancu kao proizvođača podataka. To ne mora biti snažan, skup virtualni stroj; točkasti t2.micro će biti sasvim u redu.

Važna napomena: na primjer, trebali biste koristiti sliku - Amazon Linux AMI 2018.03.0, ima manje postavki za brzo pokretanje Kinesis Agenta.

Idite na EC2 servis, kreirajte novi virtualni stroj, odaberite željeni AMI s tipom t2.micro koji je uključen u Free Tier:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Kako bi novostvoreni virtualni stroj mogao komunicirati s uslugom Kinesis, potrebno mu je dati prava za to. Najbolji način da to učinite je dodijeliti IAM ulogu. Stoga biste na zaslonu Korak 3: Konfigurirajte pojedinosti instance trebali odabrati Stvorite novu IAM ulogu:

Stvaranje IAM uloge za EC2
Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
U prozoru koji se otvori odaberite da stvaramo novu ulogu za EC2 i idite na odjeljak Dozvole:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Koristeći primjer obuke, ne moramo ulaziti u sve zamršenosti detaljne konfiguracije prava na resurse, pa ćemo odabrati pravila koja je unaprijed konfigurirao Amazon: AmazonKinesisFullAccess i CloudWatchFullAccess.

Dajmo neko smisleno ime za ovu ulogu, na primjer: EC2-KinesisStreams-FullAccess. Rezultat bi trebao biti isti kao što je prikazano na slici ispod:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Nakon što stvorite ovu novu ulogu, nemojte je zaboraviti priložiti stvorenoj instanci virtualnog stroja:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Ne mijenjamo ništa više na ovom zaslonu i prelazimo na sljedeće prozore.

Postavke tvrdog diska mogu se ostaviti kao zadane, kao i oznake (iako je dobra praksa koristiti oznake, barem dodijelite naziv instanci i označite okruženje).

Sada smo na kartici Korak 6: Konfigurirajte sigurnosnu grupu, gdje trebate stvoriti novu ili navesti postojeću sigurnosnu grupu, koja vam omogućuje povezivanje putem ssh-a (port 22) na instancu. Tamo odaberite Izvor -> Moj IP i možete pokrenuti instancu.

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Čim se prebaci u status rada, možete se pokušati spojiti na njega putem ssh-a.

Kako biste mogli raditi s Kinesis Agentom, nakon uspješnog spajanja na uređaj morate unijeti sljedeće naredbe u terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Kreirajmo mapu za spremanje API odgovora:

sudo mkdir /var/log/airline_tickets

Prije pokretanja agenta morate konfigurirati njegovu konfiguraciju:

sudo vim /etc/aws-kinesis/agent.json

Sadržaj datoteke agent.json trebao bi izgledati ovako:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kao što se može vidjeti iz konfiguracijske datoteke, agent će nadzirati datoteke s ekstenzijom .log u direktoriju /var/log/airline_tickets/, analizirati ih i prenijeti u tok airline_tickets.

Ponovno pokrećemo uslugu i uvjeravamo se da radi i radi:

sudo service aws-kinesis-agent restart

Sada preuzmimo Python skriptu koja će tražiti podatke iz API-ja:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skripta api_caller.py traži podatke od Aviasalesa i sprema primljeni odgovor u direktorij koji Kinesis agent skenira. Implementacija ove skripte prilično je standardna, postoji klasa TicketsApi, omogućuje vam asinkrono povlačenje API-ja. Ovoj klasi prosljeđujemo zaglavlje s tokenom i zahtijevamo parametre:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Kako bismo testirali ispravne postavke i funkcionalnost agenta, testirajmo skriptu api_caller.py:

sudo ./api_caller.py TOKEN

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
I gledamo rezultat rada u zapisnicima agenta i na kartici Praćenje u toku podataka airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Kao što vidite, sve radi i Kinesis Agent uspješno šalje podatke u stream. Sada konfigurirajmo potrošača.

Postavljanje Kinesis Data Analytics

Prijeđimo na središnju komponentu cijelog sustava - kreirajte novu aplikaciju u Kinesis Data Analytics pod nazivom kinesis_analytics_airlines_app:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Kinesis Data Analytics omogućuje vam izvođenje analitike podataka u stvarnom vremenu iz Kinesis Streamsa pomoću SQL jezika. To je usluga potpunog automatskog skaliranja (za razliku od Kinesis Streamsa) koja:

  1. omogućuje stvaranje novih tokova (Output Stream) na temelju zahtjeva za izvor podataka;
  2. pruža stream s pogreškama koje su se dogodile tijekom rada aplikacija (Error Stream);
  3. može automatski odrediti shemu ulaznih podataka (po potrebi se može ručno redefinirati).

Ovo nije jeftina usluga - 0.11 USD po satu rada, pa je koristite pažljivo i obrišite kada završite.

Povežimo aplikaciju s izvorom podataka:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Odaberite stream na koji ćemo se spojiti (avionske_karte):

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Zatim trebate priložiti novu IAM ulogu kako bi aplikacija mogla čitati iz toka i pisati u tok. Da biste to učinili, dovoljno je ne mijenjati ništa u bloku Dopuštenja pristupa:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Zatražimo sada otkrivanje sheme podataka u streamu; da biste to učinili, kliknite na gumb "Otkrij shemu". Kao rezultat toga, IAM uloga će se ažurirati (stvorit će se nova) i pokrenut će se otkrivanje sheme iz podataka koji su već stigli u stream:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Sada morate otići u SQL editor. Kada kliknete na ovaj gumb, pojavit će se prozor s upitom da pokrenete aplikaciju - odaberite što želite pokrenuti:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Umetnite sljedeći jednostavni upit u prozor SQL uređivača i kliknite Spremi i pokreni SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

U relacijskim bazama podataka radite s tablicama pomoću naredbi INSERT za dodavanje zapisa i naredbe SELECT za upit podataka. U Amazon Kinesis Data Analyticsu radite s tokovima (STREAMs) i pumpama (PUMPs)—kontinuiranim zahtjevima za umetanje koji umeću podatke iz jednog toka u aplikaciji u drugi tok.

Gornji SQL upit traži karte za Aeroflot po cijeni ispod pet tisuća rubalja. Svi zapisi koji zadovoljavaju ove uvjete bit će smješteni u tok DESTINATION_SQL_STREAM.

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
U bloku Destination odaberite tok special_stream, a na padajućem popisu In-application stream name DESTINATION_SQL_STREAM:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Rezultat svih manipulacija trebao bi biti nešto slično donjoj slici:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Stvaranje i pretplata na SNS temu

Idite na Simple Notification Service i tamo stvorite novu temu pod nazivom Airlines:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Pretplatite se na ovu temu i navedite broj mobitela na koji će se slati SMS obavijesti:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Napravite tablicu u DynamoDB-u

Kako bismo pohranili neobrađene podatke iz njihovog toka airline_tickets, stvorimo tablicu u DynamoDB-u s istim imenom. Koristit ćemo record_id kao primarni ključ:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Stvaranje kolektora lambda funkcija

Kreirajmo lambda funkciju pod nazivom Collector, čija će zadaća biti propitivanje streama airline_tickets i, ako se tamo pronađu novi zapisi, umetanje tih zapisa u DynamoDB tablicu. Očito, osim zadanih prava, ova lambda mora imati pristup čitanju protoka podataka Kinesis i pristup pisanju DynamoDB-u.

Stvaranje IAM uloge za kolektorsku lambda funkciju
Prvo, stvorimo novu IAM ulogu za lambda pod nazivom Lambda-TicketsProcessingRole:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Za testni primjer, unaprijed konfigurirana pravila AmazonKinesisReadOnlyAccess i AmazonDynamoDBFullAccess sasvim su prikladna, kao što je prikazano na slici ispod:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Ovu lambdu trebao bi pokrenuti okidač iz Kinesisa kada novi unosi uđu u airline_stream, pa moramo dodati novi okidač:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Ostaje samo ubaciti kod i spremiti lambdu.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Izrada obavijesti o lambda funkciji

Druga lambda funkcija, koja će nadzirati drugi tok (special_stream) i poslati obavijest SNS-u, kreira se na sličan način. Dakle, ova lambda mora imati pristup čitanju iz Kinesisa i slanju poruka na zadanu SNS temu, koje će onda SNS usluga slati svim pretplatnicima ove teme (e-mail, SMS, itd.).

Stvaranje IAM uloge
Najprije stvaramo IAM ulogu Lambda-KinesisAlarm za ovu lambdu, a zatim tu ulogu dodjeljujemo lambda alarm_notifieru koji se stvara:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Ova lambda bi trebala raditi na okidaču za nove zapise da uđu u special_stream, tako da morate konfigurirati okidač na isti način kao što smo mi učinili za Collector lambda.

Kako bismo olakšali konfiguriranje ove lambda, uvedimo novu varijablu okruženja - TOPIC_ARN, gdje postavljamo ANR (Amazon Recourse Names) teme Airlines:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
I umetnite lambda kod, uopće nije komplicirano:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Čini se da je tu ručna konfiguracija sustava završena. Ostaje samo testirati i uvjeriti se da smo sve ispravno konfigurirali.

Implementiraj iz koda Terraform

Obavezna priprema

Terraform je vrlo zgodan alat otvorenog koda za postavljanje infrastrukture iz koda. Ima vlastitu sintaksu koju je lako naučiti i ima mnogo primjera kako i što implementirati. Atom editor ili Visual Studio Code ima mnogo praktičnih dodataka koji olakšavaju rad s Terraformom.

Distribuciju možete preuzeti stoga. Detaljna analiza svih Terraform mogućnosti je izvan opsega ovog članka, pa ćemo se ograničiti na glavne točke.

Kako započeti

Puni kod projekta je u mom spremištu. Kloniramo spremište prema sebi. Prije početka, morate biti sigurni da imate instaliran i konfiguriran AWS CLI, jer... Terraform će tražiti vjerodajnice u datoteci ~/.aws/credentials.

Dobra praksa je pokrenuti naredbu plana prije postavljanja cijele infrastrukture da vidimo što Terraform trenutno stvara za nas u oblaku:

terraform.exe plan

Od vas će se tražiti da unesete telefonski broj na koji ćete slati obavijesti. Nije ga potrebno unijeti u ovoj fazi.

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Nakon što smo analizirali plan rada programa, možemo započeti sa stvaranjem resursa:

terraform.exe apply

Nakon slanja ove naredbe, od vas će se ponovno tražiti da unesete telefonski broj; birajte "da" kada se prikaže pitanje o stvarnom izvođenju radnji. To će vam omogućiti da postavite cjelokupnu infrastrukturu, provedete svu potrebnu konfiguraciju EC2, postavite lambda funkcije itd.

Nakon što su svi resursi uspješno kreirani kroz Terraform kod, potrebno je ući u detalje aplikacije Kinesis Analytics (nažalost, nisam našao kako to učiniti izravno iz koda).

Pokrenite aplikaciju:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Nakon toga morate izričito postaviti naziv toka unutar aplikacije odabirom s padajućeg popisa:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Sada je sve spremno za polazak.

Testiranje aplikacije

Bez obzira na to kako ste postavili sustav, ručno ili putem Terraform koda, radit će isto.

Logiramo se putem SSH-a na EC2 virtualni stroj na kojem je instaliran Kinesis Agent i pokrećemo skriptu api_caller.py

sudo ./api_caller.py TOKEN

Sve što trebate učiniti je pričekati SMS na vaš broj:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
SMS - poruka stiže na vaš telefon za skoro 1 minutu:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Ostaje vidjeti jesu li zapisi spremljeni u DynamoDB bazu podataka za naknadnu, detaljniju analizu. Tablica airline_tickets sadrži otprilike sljedeće podatke:

Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja

Zaključak

Tijekom obavljenog posla izgrađen je online sustav za obradu podataka baziran na Amazon Kinesis. Razmotrene su mogućnosti korištenja Kinesis Agenta u kombinaciji s Kinesis Data Streams i analitikom Kinesis Analytics u stvarnom vremenu pomoću SQL naredbi, kao i interakcija Amazon Kinesis s drugim AWS servisima.

Gornji sustav implementirali smo na dva načina: prilično dugačak ručni i brzi iz Terraform koda.

Sav izvorni kod projekta je dostupan u mom GitHub repozitoriju, predlažem da se upoznate s njim.

Rado ću raspravljati o članku, veselim se vašim komentarima. Nadam se konstruktivnoj kritici.

Želim vam uspjeh!

Izvor: www.habr.com

Dodajte komentar