Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Hei Habr!

Îți place să zbori cu avioane? Îmi place, dar în timpul autoizolării m-am îndrăgostit și de analizarea datelor despre biletele de avion dintr-o resursă binecunoscută - Aviasales.

Astăzi vom analiza activitatea Amazon Kinesis, vom construi un sistem de streaming cu analize în timp real, vom instala baza de date Amazon DynamoDB NoSQL ca principală stocare a datelor și vom configura notificări prin SMS pentru bilete interesante.

Toate detaliile sunt sub tăietură! Merge!

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Introducere

De exemplu, avem nevoie de acces la Aviasales API. Accesul la acesta este oferit gratuit și fără restricții; trebuie doar să vă înregistrați în secțiunea „Dezvoltatori” pentru a primi tokenul API pentru a accesa datele.

Scopul principal al acestui articol este de a oferi o înțelegere generală a utilizării fluxului de informații în AWS; luăm în considerare faptul că datele returnate de API-ul utilizat nu sunt strict actualizate și sunt transmise din cache, care este format pe baza căutărilor efectuate de utilizatorii site-urilor Aviasales.ru și Jetradar.com în ultimele 48 de ore.

Kinesis-agent, instalat pe mașina producătoare, primit prin API va analiza și transmite automat datele către fluxul dorit prin Kinesis Data Analytics. Versiunea brută a acestui flux va fi scrisă direct în magazin. Stocarea de date brute implementată în DynamoDB va permite o analiză mai profundă a biletelor prin instrumente BI, cum ar fi AWS Quick Sight.

Vom lua în considerare două opțiuni pentru implementarea întregii infrastructuri:

  • Manual - prin AWS Management Console;
  • Infrastructura din codul Terraform este pentru automatori leneși;

Arhitectura sistemului dezvoltat

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Componente folosite:

  • Aviasales API — datele returnate de acest API vor fi folosite pentru toate lucrările ulterioare;
  • Instanță de producător EC2 — o mașină virtuală obișnuită în cloud pe care va fi generat fluxul de date de intrare:
    • Agent Kinesis este o aplicație Java instalată local pe mașină care oferă o modalitate ușoară de a colecta și trimite date către Kinesis (Kinesis Data Streams sau Kinesis Firehose). Agentul monitorizează constant un set de fișiere din directoarele specificate și trimite date noi către Kinesis;
    • Script apelant API — Un script Python care face cereri către API și pune răspunsul într-un folder care este monitorizat de agentul Kinesis;
  • Fluxuri de date Kinesis — serviciu de streaming de date în timp real, cu capacități mari de scalare;
  • Kinesis Analytics este un serviciu fără server care simplifică analiza datelor de streaming în timp real. Amazon Kinesis Data Analytics configurează resursele aplicației și se scalează automat pentru a gestiona orice volum de date primite;
  • AWS Lambdas — un serviciu care vă permite să rulați cod fără să faceți copii de rezervă sau să configurați servere. Toată puterea de calcul este scalată automat pentru fiecare apel;
  • Amazon DynamoDB - O bază de date de perechi cheie-valoare și documente care oferă o latență mai mică de 10 milisecunde atunci când rulează la orice scară. Când utilizați DynamoDB, nu trebuie să furnizați, corecționați sau gestionați niciun server. DynamoDB scalează automat tabelele pentru a ajusta cantitatea de resurse disponibile și pentru a menține performanța ridicată. Nu este necesară administrarea sistemului;
  • Amazon SNS - un serviciu complet gestionat pentru trimiterea de mesaje folosind modelul publisher-abonat (Pub/Sub), cu ajutorul căruia poți izola microservicii, sistemele distribuite și aplicațiile serverless. SNS poate fi folosit pentru a trimite informații către utilizatorii finali prin notificări push mobile, mesaje SMS și e-mailuri.

Antrenament initial

Pentru a emula fluxul de date, am decis să folosesc informațiile despre biletele de avion returnate de API-ul Aviasales. ÎN documentație o listă destul de extinsă de metode diferite, să luăm una dintre ele - „Calendarul lunar al prețurilor”, care returnează prețurile pentru fiecare zi a lunii, grupate după numărul de transferuri. Dacă nu specificați luna căutării în cerere, informațiile vor fi returnate pentru luna următoare celei curente.

Deci, haideți să ne înregistrăm și să obținem simbolul nostru.

Un exemplu de cerere este mai jos:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Metoda de mai sus de a primi date de la API prin specificarea unui token în cerere va funcționa, dar prefer să trec jetonul de acces prin antet, așa că vom folosi această metodă în scriptul api_caller.py.

Exemplu de răspuns:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Exemplul de răspuns API de mai sus arată un bilet de la Sankt Petersburg la Phuk... Oh, ce vis...
Deoarece sunt din Kazan, iar Phuket este acum „doar un vis”, să căutăm bilete de la Sankt Petersburg la Kazan.

Se presupune că aveți deja un cont AWS. Aș dori să atrag imediat o atenție deosebită asupra faptului că Kinesis și trimiterea notificărilor prin SMS nu sunt incluse în Nivel gratuit (utilizare gratuită). Dar chiar și în ciuda acestui fapt, având în vedere câțiva dolari, este foarte posibil să construiți sistemul propus și să vă jucați cu el. Și, desigur, nu uitați să ștergeți toate resursele după ce nu mai sunt necesare.

Din fericire, funcțiile DynamoDb și lambda vor fi gratuite pentru noi dacă ne îndeplinim limitele lunare gratuite. De exemplu, pentru DynamoDB: 25 GB de stocare, 25 WCU/RCU și 100 de milioane de interogări. Și un milion de apeluri la funcție lambda pe lună.

Implementarea manuală a sistemului

Configurarea fluxurilor de date Kinesis

Să mergem la serviciul Kinesis Data Streams și să creăm două fluxuri noi, câte un fragment pentru fiecare.

Ce este un ciob?
Un shard este unitatea de bază de transfer de date a unui flux Amazon Kinesis. Un segment oferă transfer de date de intrare la o viteză de 1 MB/s și transfer de date de ieșire la o viteză de 2 MB/s. Un segment acceptă până la 1000 de intrări PUT pe secundă. Când creați un flux de date, trebuie să specificați numărul necesar de segmente. De exemplu, puteți crea un flux de date cu două segmente. Acest flux de date va oferi transfer de date de intrare la 2 MB/s și transfer de date de ieșire la 4 MB/s, acceptând până la 2000 de înregistrări PUT pe secundă.

Cu cât sunt mai multe fragmente în fluxul dvs., cu atât debitul acestuia este mai mare. În principiu, așa sunt scalate fluxurile - prin adăugarea de fragmente. Dar cu cât ai mai multe cioburi, cu atât prețul este mai mare. Fiecare fragment costă 1,5 cenți pe oră și încă 1.4 cenți pentru fiecare milion de unități de încărcare utilă PUT.

Să creăm un flux nou cu numele bilete de avion, 1 ciob va fi suficient pentru el:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Acum să creăm un alt thread cu numele special_stream:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Configurarea producătorului

Pentru a analiza o sarcină, este suficient să folosiți o instanță EC2 obișnuită ca producător de date. Nu trebuie să fie o mașină virtuală puternică și costisitoare; un spot t2.micro se va descurca bine.

Notă importantă: de exemplu, ar trebui să utilizați imagine - Amazon Linux AMI 2018.03.0, are mai puține setări pentru lansarea rapidă a agentului Kinesis.

Accesați serviciul EC2, creați o nouă mașină virtuală, selectați AMI-ul dorit cu tipul t2.micro, care este inclus în Nivelul gratuit:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Pentru ca mașina virtuală nou creată să poată interacționa cu serviciul Kinesis, trebuie să i se acorde drepturi pentru a face acest lucru. Cel mai bun mod de a face acest lucru este de a atribui un rol IAM. Prin urmare, pe ecranul Pasul 3: Configurați detaliile instanței, ar trebui să selectați Creați un nou rol IAM:

Crearea unui rol IAM pentru EC2
Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
În fereastra care se deschide, selectați că creăm un nou rol pentru EC2 și accesați secțiunea Permisiuni:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Folosind exemplul de instruire, nu trebuie să analizăm toate complexitățile configurației granulare a drepturilor de resurse, așa că vom selecta politicile preconfigurate de Amazon: AmazonKinesisFullAccess și CloudWatchFullAccess.

Să dăm un nume semnificativ pentru acest rol, de exemplu: EC2-KinesisStreams-FullAccess. Rezultatul ar trebui să fie același cu cel din imaginea de mai jos:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
După crearea acestui nou rol, nu uitați să-l atașați instanței de mașină virtuală creată:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Nu modificăm nimic altceva pe acest ecran și trecem la ferestrele următoare.

Setările hard disk-ului pot fi lăsate implicite, la fel și etichetele (deși este o bună practică să folosiți etichete, cel puțin dați un nume instanței și indicați mediul).

Acum ne aflăm la Pasul 6: fila Configurare grup de securitate, unde trebuie să creați unul nou sau să specificați grupul de securitate existent, care vă permite să vă conectați prin ssh (portul 22) la instanță. Selectați Sursă -> IP-ul meu acolo și puteți lansa instanța.

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
De îndată ce trece la starea de rulare, puteți încerca să vă conectați la el prin ssh.

Pentru a putea lucra cu Kinesis Agent, după conectarea cu succes la mașină, trebuie să introduceți următoarele comenzi în terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Să creăm un folder pentru a salva răspunsurile API:

sudo mkdir /var/log/airline_tickets

Înainte de a porni agentul, trebuie să-i configurați configurația:

sudo vim /etc/aws-kinesis/agent.json

Conținutul fișierului agent.json ar trebui să arate astfel:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

După cum se poate vedea din fișierul de configurare, agentul va monitoriza fișierele cu extensia .log din directorul /var/log/airline_tickets/, le va analiza și le va transfera în fluxul airline_tickets.

Repornim serviciul și ne asigurăm că este în funcțiune:

sudo service aws-kinesis-agent restart

Acum să descarcăm scriptul Python care va solicita date de la API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Scriptul api_caller.py solicită date de la Aviasales și salvează răspunsul primit în directorul pe care agentul Kinesis îl scanează. Implementarea acestui script este destul de standard, există o clasă TicketsApi, vă permite să trageți asincron API-ul. Trecem un antet cu un token și solicităm parametri acestei clase:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Pentru a testa setările corecte și funcționalitatea agentului, să testăm rularea scriptului api_caller.py:

sudo ./api_caller.py TOKEN

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Și ne uităm la rezultatul muncii în jurnalele agenților și în fila Monitorizare din fluxul de date airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
După cum puteți vedea, totul funcționează, iar agentul Kinesis trimite cu succes date în flux. Acum să configuram consumer.

Configurarea Kinesis Data Analytics

Să trecem la componenta centrală a întregului sistem - creați o nouă aplicație în Kinesis Data Analytics numită kinesis_analytics_airlines_app:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Kinesis Data Analytics vă permite să efectuați analize de date în timp real din Kinesis Streams folosind limbajul SQL. Este un serviciu complet autoscaling (spre deosebire de Kinesis Streams) care:

  1. vă permite să creați noi fluxuri (Output Stream) pe baza solicitărilor de date sursă;
  2. furnizează un flux cu erori care au apărut în timpul rulării aplicațiilor (Error Stream);
  3. poate determina automat schema datelor de intrare (poate fi redefinită manual dacă este necesar).

Acesta nu este un serviciu ieftin - 0.11 USD pe oră de lucru, așa că ar trebui să îl utilizați cu atenție și să îl ștergeți când ați terminat.

Să conectăm aplicația la sursa de date:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Selectați fluxul la care ne vom conecta (airline_tickets):

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Apoi, trebuie să atașați un nou rol IAM, astfel încât aplicația să poată citi din flux și să scrie în flux. Pentru a face acest lucru, este suficient să nu schimbați nimic în blocul de permisiuni de acces:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Acum să cerem descoperirea schemei de date în flux; pentru a face acest lucru, faceți clic pe butonul „Descoperiți schema”. Ca urmare, rolul IAM va fi actualizat (va fi creat unul nou) și va fi lansată detectarea schemei din datele care au ajuns deja în flux:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Acum trebuie să mergeți la editorul SQL. Când faceți clic pe acest buton, va apărea o fereastră care vă va cere să lansați aplicația - selectați ceea ce doriți să lansați:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Introduceți următoarea interogare simplă în fereastra editorului SQL și faceți clic pe Salvare și executare SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

În bazele de date relaționale, lucrați cu tabele folosind instrucțiuni INSERT pentru a adăuga înregistrări și o instrucțiune SELECT pentru a interoga datele. În Amazon Kinesis Data Analytics, lucrați cu fluxuri (STREAM) și pompe (PUMP) - solicitări de inserare continue care inserează date dintr-un flux dintr-o aplicație într-un alt flux.

Interogarea SQL prezentată mai sus caută bilete Aeroflot la un cost sub cinci mii de ruble. Toate înregistrările care îndeplinesc aceste condiții vor fi plasate în fluxul DESTINATION_SQL_STREAM.

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
În blocul Destinație, selectați fluxul special_stream și în lista derulantă Numele fluxului în aplicație DESTINATION_SQL_STREAM:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Rezultatul tuturor manipulărilor ar trebui să fie ceva similar cu imaginea de mai jos:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Crearea și abonarea la un subiect SNS

Accesați Serviciul de notificare simplă și creați acolo un subiect nou cu numele Companii aeriene:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Abonați-vă la acest subiect și indicați numărul de telefon mobil la care vor fi trimise notificările prin SMS:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Creați un tabel în DynamoDB

Pentru a stoca datele brute din fluxul lor airline_tickets, să creăm un tabel în DynamoDB cu același nume. Vom folosi record_id ca cheie primară:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Crearea unui colector de funcții lambda

Să creăm o funcție lambda numită Collector, a cărei sarcină va fi să interogheze fluxul airline_tickets și, dacă se găsesc noi înregistrări acolo, să inserăm aceste înregistrări în tabelul DynamoDB. Evident, pe lângă drepturile implicite, acest lambda trebuie să aibă acces de citire la fluxul de date Kinesis și acces de scriere la DynamoDB.

Crearea unui rol IAM pentru funcția lambda de colector
Mai întâi, să creăm un nou rol IAM pentru lambda numit Lambda-TicketsProcessingRole:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Pentru exemplul de testare, politicile preconfigurate AmazonKinesisReadOnlyAccess și AmazonDynamoDBFullAccess sunt destul de potrivite, așa cum se arată în imaginea de mai jos:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Această lambda ar trebui să fie lansată de un declanșator de la Kinesis atunci când intrări noi intră în airline_stream, așa că trebuie să adăugăm un nou declanșator:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Mai rămâne doar să introduceți codul și să salvați lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Crearea unui notificator de funcție lambda

A doua funcție lambda, care va monitoriza al doilea flux (special_stream) și va trimite o notificare către SNS, este creată într-un mod similar. Prin urmare, acest lambda trebuie să aibă acces pentru a citi din Kinesis și a trimite mesaje către un anumit subiect SNS, care vor fi apoi trimise de serviciul SNS tuturor abonaților acestui subiect (e-mail, SMS etc.).

Crearea unui rol IAM
Mai întâi, creăm rolul IAM Lambda-KinesisAlarm pentru acest lambda, apoi atribuim acest rol alarm_notifier lambda care este creat:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Acest lambda ar trebui să funcționeze pe un declanșator pentru ca noi înregistrări să intre în special_stream, așa că trebuie să configurați declanșatorul în același mod ca și noi pentru Collector lambda.

Pentru a facilita configurarea acestei lambda, să introducem o nouă variabilă de mediu - TOPIC_ARN, unde plasăm ANR (Amazon Recourse Names) subiectului Companiei aeriene:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Și introduceți codul lambda, nu este deloc complicat:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Se pare că aici este finalizată configurarea manuală a sistemului. Tot ce rămâne este să testăm și să ne asigurăm că am configurat totul corect.

Implementați din codul Terraform

Pregătirea necesară

Terraform este un instrument open-source foarte convenabil pentru implementarea infrastructurii din cod. Are propria sa sintaxă care este ușor de învățat și are multe exemple despre cum și ce să implementeze. Editorul Atom sau Visual Studio Code are multe plugin-uri la îndemână care facilitează lucrul cu Terraform.

Puteți descărca distribuția prin urmare. O analiză detaliată a tuturor capabilităților Terraform depășește scopul acestui articol, așa că ne vom limita la punctele principale.

Cum să înceapă

Codul complet al proiectului este în depozitul meu. Clonăm depozitul pentru noi înșine. Înainte de a începe, trebuie să vă asigurați că aveți AWS CLI instalat și configurat, deoarece... Terraform va căuta acreditări în fișierul ~/.aws/credentials.

O practică bună este să rulați comanda plan înainte de a implementa întreaga infrastructură pentru a vedea ce creează Terraform în prezent pentru noi în cloud:

terraform.exe plan

Vi se va solicita să introduceți un număr de telefon către care să trimiteți notificări. Nu este necesar să îl introduceți în această etapă.

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
După ce am analizat planul de funcționare al programului, putem începe să creăm resurse:

terraform.exe apply

După trimiterea acestei comenzi, vi se va cere din nou să introduceți un număr de telefon; formați „da” când este afișată o întrebare despre efectuarea efectivă a acțiunilor. Acest lucru vă va permite să configurați întreaga infrastructură, să efectuați toată configurația necesară a EC2, să implementați funcții lambda etc.

După ce toate resursele au fost create cu succes prin codul Terraform, trebuie să intri în detaliile aplicației Kinesis Analytics (din păcate, nu am găsit cum să fac asta direct din cod).

Lansați aplicația:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
După aceasta, trebuie să setați în mod explicit numele fluxului în aplicație selectând din lista derulantă:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Acum totul este gata să meargă.

Testarea aplicației

Indiferent de modul în care ați implementat sistemul, manual sau prin codul Terraform, acesta va funcționa la fel.

Ne conectăm prin SSH la mașina virtuală EC2 unde este instalat Kinesis Agent și rulăm scriptul api_caller.py

sudo ./api_caller.py TOKEN

Tot ce trebuie să faci este să aștepți un SMS la numărul tău:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
SMS - mesajul ajunge pe telefon în aproape 1 minut:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server
Rămâne de văzut dacă înregistrările au fost salvate în baza de date DynamoDB pentru o analiză ulterioară, mai detaliată. Tabelul airline_tickets conține aproximativ următoarele date:

Integrare API Aviasales cu Amazon Kinesis și simplitate fără server

Concluzie

Pe parcursul lucrărilor efectuate, a fost construit un sistem de procesare a datelor online bazat pe Amazon Kinesis. Au fost luate în considerare opțiunile de utilizare a agentului Kinesis împreună cu Kinesis Data Streams și analiza în timp real Kinesis Analytics folosind comenzi SQL, precum și interacțiunea Amazon Kinesis cu alte servicii AWS.

Am implementat sistemul de mai sus în două moduri: unul manual destul de lung și unul rapid din codul Terraform.

Tot codul sursă al proiectului este disponibil în depozitul meu GitHub, vă sugerez să vă familiarizați cu el.

Mă bucur să discut despre articol, aștept comentariile voastre. Sper la critici constructive.

Vă urez succes!

Sursa: www.habr.com

Adauga un comentariu