Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Čau Habr!

Máte radi lietanie na lietadlách? Milujem to, ale počas sebaizolácie som sa zamiloval aj do analýzy údajov o letenkách z jedného známeho zdroja - Aviasales.

Dnes si rozoberieme prácu Amazon Kinesis, postavíme streamovací systém s analytikou v reálnom čase, nainštalujeme databázu Amazon DynamoDB NoSQL ako hlavné úložisko dát a nastavíme SMS notifikácie na zaujímavé vstupenky.

Všetky detaily sú pod strihom! Choď!

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Úvod

Napríklad potrebujeme prístup k Aviasales API. Prístup k nemu je poskytovaný bezplatne a bez obmedzení, stačí sa zaregistrovať v sekcii „Vývojári“, aby ste dostali svoj API token na prístup k údajom.

Hlavným účelom tohto článku je poskytnúť všeobecné pochopenie používania streamovania informácií v AWS; berieme do úvahy, že údaje vrátené použitým API nie sú striktne aktuálne a sú prenášané z vyrovnávacej pamäte, ktorá je vytvorené na základe vyhľadávaní používateľmi stránok Aviasales.ru a Jetradar.com za posledných 48 hodín.

Kinesis-agent, nainštalovaný na výrobnom stroji, prijatý cez API, automaticky analyzuje a prenesie dáta do požadovaného prúdu cez Kinesis Data Analytics. Surová verzia tohto streamu bude zapísaná priamo do obchodu. Úložisko nespracovaných údajov nasadené v DynamoDB umožní hlbšiu analýzu lístkov prostredníctvom nástrojov BI, ako je napríklad AWS Quick Sight.

Zvážime dve možnosti nasadenia celej infraštruktúry:

  • Manuálne - cez konzolu AWS Management Console;
  • Infraštruktúra z kódu Terraform je pre lenivé automaty;

Architektúra vyvinutého systému

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Použité komponenty:

  • Aviasales API — údaje vrátené týmto rozhraním API sa použijú na všetku ďalšiu prácu;
  • Inštancia výrobcu EC2 — bežný virtuálny stroj v cloude, na ktorom sa bude generovať vstupný dátový tok:
    • Kinesis Agent je aplikácia Java nainštalovaná lokálne na stroji, ktorá poskytuje jednoduchý spôsob zhromažďovania a odosielania údajov do Kinesis (Kinesis Data Streams alebo Kinesis Firehose). Agent neustále monitoruje množinu súborov v určených adresároch a posiela nové údaje do Kinesis;
    • Skript volajúceho API — Python skript, ktorý vytvára požiadavky na API a ukladá odpoveď do priečinka, ktorý je monitorovaný agentom Kinesis;
  • Kinesis dátové toky — služba streamovania údajov v reálnom čase so širokými možnosťami škálovania;
  • Kinesis Analytics je služba bez servera, ktorá zjednodušuje analýzu streamovaných dát v reálnom čase. Amazon Kinesis Data Analytics konfiguruje aplikačné zdroje a automaticky sa škáluje tak, aby spracoval akýkoľvek objem prichádzajúcich údajov;
  • AWS Lambda — služba, ktorá vám umožňuje spúšťať kód bez zálohovania alebo nastavovania serverov. Všetok výpočtový výkon je automaticky škálovaný pre každý hovor;
  • Amazon DynamoDB - Databáza párov kľúč-hodnota a dokumentov, ktorá poskytuje latenciu menšiu ako 10 milisekúnd pri spustení v akomkoľvek rozsahu. Keď používate DynamoDB, nemusíte zabezpečovať, opravovať ani spravovať žiadne servery. DynamoDB automaticky škáluje tabuľky tak, aby upravovali množstvo dostupných zdrojov a udržiavali vysoký výkon. Nie je potrebná žiadna správa systému;
  • Amazon SNS - plne spravovaná služba na odosielanie správ pomocou modelu vydavateľ-predplatiteľ (Pub/Sub), pomocou ktorej môžete izolovať mikroslužby, distribuované systémy a aplikácie bez servera. SNS možno použiť na odosielanie informácií koncovým používateľom prostredníctvom mobilných upozornení push, SMS správ a e-mailov.

Úvodné školenie

Aby som napodobnil dátový tok, rozhodol som sa použiť informácie o letenkách, ktoré vrátilo Aviasales API. IN dokumentáciu pomerne rozsiahly zoznam rôznych metód, zoberme si jednu z nich - „Mesačný cenový kalendár“, ktorý vracia ceny za každý deň v mesiaci, zoskupené podľa počtu prevodov. Ak v žiadosti neuvediete mesiac vyhľadávania, vrátia sa informácie za mesiac nasledujúci po aktuálnom mesiaci.

Tak sa zaregistrujte a získajte náš token.

Príklad žiadosti je uvedený nižšie:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Vyššie uvedený spôsob prijímania údajov z API zadaním tokenu v požiadavke bude fungovať, ale uprednostňujem odovzdanie prístupového tokenu cez hlavičku, takže túto metódu použijeme v skripte api_caller.py.

Príklad odpovede:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Príklad odpovede API vyššie ukazuje letenku z Petrohradu do Phuku... Ach, aký sen...
Keďže som z Kazane a Phuket je teraz „len sen“, poďme hľadať letenky z Petrohradu do Kazane.

Predpokladá sa, že už máte účet AWS. Okamžite upozorňujem najmä na to, že Kinesis a zasielanie notifikácií cez SMS nie sú zahrnuté v ročnej Bezplatná úroveň (bezplatné použitie). Ale aj napriek tomu, s pár dolármi na mysli, je celkom možné zostaviť navrhovaný systém a hrať sa s ním. A, samozrejme, nezabudnite vymazať všetky zdroje, keď už nie sú potrebné.

Našťastie funkcie DynamoDb a lambda budú pre nás zadarmo, ak splníme naše mesačné bezplatné limity. Napríklad pre DynamoDB: 25 GB úložného priestoru, 25 WCU/RCU a 100 miliónov dopytov. A milión volaní funkcie lambda za mesiac.

Manuálne nasadenie systému

Nastavenie dátových tokov Kinesis

Poďme do služby Kinesis Data Streams a vytvorte dva nové streamy, pre každý jeden zlomok.

Čo je to črep?
Úlomok je základná jednotka prenosu údajov streamu Amazon Kinesis. Jeden segment zabezpečuje prenos vstupných dát rýchlosťou 1 MB/s a výstup výstupných dát rýchlosťou 2 MB/s. Jeden segment podporuje až 1000 vstupov PUT za sekundu. Pri vytváraní toku údajov je potrebné zadať požadovaný počet segmentov. Môžete napríklad vytvoriť tok údajov s dvoma segmentmi. Tento dátový tok poskytne vstupný dátový prenos rýchlosťou 2 MB/s a výstupný dátový prenos rýchlosťou 4 MB/s s podporou až 2000 PUT záznamov za sekundu.

Čím viac zlomkov je vo vašom streame, tým väčšia je jeho priepustnosť. V princípe sa toky škálujú takto – pridávaním črepov. Ale čím viac úlomkov máte, tým vyššia je cena. Každý úlomok stojí 1,5 centa za hodinu a ďalších 1.4 centa za každý milión jednotiek užitočného zaťaženia PUT.

Vytvorme nový stream s názvom letenky, bude mu stačiť 1 črep:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Teraz vytvoríme ďalšie vlákno s názvom special_stream:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Nastavenie výrobcu

Na analýzu úlohy stačí použiť bežnú inštanciu EC2 ako producenta údajov. Nemusí to byť výkonný a drahý virtuálny stroj; spot t2.micro postačí.

Dôležitá poznámka: mali by ste napríklad použiť image - Amazon Linux AMI 2018.03.0, má menej nastavení na rýchle spustenie Kinesis Agenta.

Prejdite do služby EC2, vytvorte nový virtuálny stroj, vyberte požadovaný AMI s typom t2.micro, ktorý je súčasťou bezplatnej úrovne:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Aby mohol novovytvorený virtuálny stroj interagovať so službou Kinesis, musí mať na to práva. Najlepší spôsob, ako to urobiť, je priradiť rolu IAM. Preto by ste na obrazovke Krok 3: Konfigurácia podrobností inštancie mali vybrať Vytvorte novú rolu IAM:

Vytvorenie role IAM pre EC2
Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
V okne, ktoré sa otvorí, vyberte, že vytvárame novú rolu pre EC2 a prejdite do sekcie Povolenia:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Pomocou príkladu školenia nemusíme zachádzať do všetkých zložitostí podrobnej konfigurácie práv na zdroje, takže vyberieme pravidlá vopred nakonfigurované spoločnosťou Amazon: AmazonKinesisFullAccess a CloudWatchFullAccess.

Dajme tejto úlohe nejaký zmysluplný názov, napríklad: EC2-KinesisStreams-FullAccess. Výsledok by mal byť rovnaký ako na obrázku nižšie:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Po vytvorení tejto novej roly ju nezabudnite pripojiť k vytvorenej inštancii virtuálneho počítača:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Na tejto obrazovke už nič nemeníme a prejdeme na ďalšie okná.

Nastavenia pevného disku možno ponechať ako predvolené, rovnako ako značky (hoci je dobrým zvykom používať značky, aspoň pomenujte inštanciu a uveďte prostredie).

Teraz sme na karte Krok 6: Konfigurácia bezpečnostnej skupiny, kde musíte vytvoriť novú alebo zadať svoju existujúcu bezpečnostnú skupinu, ktorá vám umožní pripojiť sa cez ssh (port 22) k inštancii. Tam vyberte Zdroj -> Moja IP a môžete spustiť inštanciu.

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Hneď ako sa prepne do spusteného stavu, môžete sa k nemu pokúsiť pripojiť cez ssh.

Aby ste mohli pracovať s Kinesis Agent, musíte po úspešnom pripojení k stroju zadať do terminálu nasledujúce príkazy:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Vytvorme priečinok na uloženie odpovedí API:

sudo mkdir /var/log/airline_tickets

Pred spustením agenta musíte nakonfigurovať jeho konfiguráciu:

sudo vim /etc/aws-kinesis/agent.json

Obsah súboru agent.json by mal vyzerať takto:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Ako je zrejmé z konfiguračného súboru, agent bude sledovať súbory s príponou .log v adresári /var/log/airline_tickets/, analyzovať ich a prenášať do streamu airline_tickets.

Reštartujeme službu a uistíme sa, že je v prevádzke:

sudo service aws-kinesis-agent restart

Teraz si stiahneme skript Python, ktorý bude vyžadovať údaje z API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skript api_caller.py požaduje údaje od Aviasales a ukladá prijatú odpoveď do adresára, ktorý agent Kinesis skenuje. Implementácia tohto skriptu je celkom štandardná, existuje trieda TicketsApi, ktorá vám umožňuje asynchrónne ťahať API. Tejto triede odovzdávame hlavičku s tokenom a parametrami požiadavky:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Ak chcete otestovať správne nastavenia a funkčnosť agenta, vyskúšajte spustiť skript api_caller.py:

sudo ./api_caller.py TOKEN

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
A pozrieme sa na výsledok práce v denníkoch agenta a na karte Monitorovanie v toku údajov airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Ako vidíte, všetko funguje a Kinesis Agent úspešne odosiela dáta do streamu. Teraz nakonfigurujeme spotrebiteľa.

Nastavenie analýzy údajov Kinesis

Prejdime k centrálnej súčasti celého systému – vytvorte novú aplikáciu v Kinesis Data Analytics s názvom kinesis_analytics_airlines_app:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Kinesis Data Analytics vám umožňuje vykonávať analýzu údajov v reálnom čase z Kinesis Streams pomocou jazyka SQL. Ide o plne automatickú službu škálovania (na rozdiel od Kinesis Streams), ktorá:

  1. umožňuje vytvárať nové streamy (Output Stream) na základe požiadaviek na zdrojové dáta;
  2. poskytuje stream s chybami, ktoré sa vyskytli počas spustenia aplikácií (Error Stream);
  3. dokáže automaticky určiť schému vstupných údajov (v prípade potreby ju možno manuálne predefinovať).

Toto nie je lacná služba – 0.11 USD za hodinu práce, takže by ste ju mali používať opatrne a po dokončení ju vymazať.

Pripojme aplikáciu k zdroju údajov:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Vyberte stream, ku ktorému sa pripojíme (letenky):

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Ďalej musíte pripojiť novú rolu IAM, aby aplikácia mohla čítať zo streamu a zapisovať do streamu. Na to stačí nič nemeniť v bloku Prístupové oprávnenia:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Teraz požiadame o objavenie schémy údajov v streame; Ak to chcete urobiť, kliknite na tlačidlo „Objaviť schému“. V dôsledku toho sa aktualizuje rola IAM (vytvorí sa nová) a spustí sa zisťovanie schém z údajov, ktoré už prišli do streamu:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Teraz musíte prejsť do editora SQL. Po kliknutí na toto tlačidlo sa zobrazí okno s výzvou na spustenie aplikácie - vyberte, čo chcete spustiť:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Vložte nasledujúci jednoduchý dotaz do okna editora SQL a kliknite na Uložiť a spustiť SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

V relačných databázach pracujete s tabuľkami pomocou príkazov INSERT na pridávanie záznamov a príkazu SELECT na dopytovanie údajov. V Amazon Kinesis Data Analytics pracujete s prúdmi (STREAM) a pumpami (PUMP) – kontinuálne požiadavky na vkladanie, ktoré vkladajú údaje z jedného prúdu v aplikácii do iného prúdu.

SQL dotaz uvedený vyššie hľadá letenky Aeroflot za cenu nižšiu ako päť tisíc rubľov. Všetky záznamy, ktoré spĺňajú tieto podmienky, budú umiestnené v streame DESTINATION_SQL_STREAM.

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
V bloku Cieľ vyberte stream special_stream a v rozbaľovacom zozname Názov streamu v aplikácii DESTINATION_SQL_STREAM:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Výsledkom všetkých manipulácií by malo byť niečo podobné ako na obrázku nižšie:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Vytvorenie a prihlásenie na odber témy SNS

Prejdite na službu Simple Notification Service a vytvorte tam novú tému s názvom Airlines:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Prihláste sa na odber tejto témy a uveďte číslo mobilného telefónu, na ktoré sa budú odosielať upozornenia SMS:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Vytvorte tabuľku v DynamoDB

Ak chcete uložiť nespracované údaje z ich streamu airline_tickets, vytvorte tabuľku v DynamoDB s rovnakým názvom. Ako primárny kľúč použijeme record_id:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Vytvorenie kolektora funkcie lambda

Vytvorme si lambda funkciu s názvom Collector, ktorej úlohou bude osloviť stream airline_tickets a ak sa tam nájdu nové záznamy, vložiť tieto záznamy do tabuľky DynamoDB. Je zrejmé, že okrem predvolených práv musí mať táto lambda prístup na čítanie k dátovému toku Kinesis a prístup na zápis do DynamoDB.

Vytvorenie roly IAM pre funkciu kolektora lambda
Najprv vytvorte novú rolu IAM pre lambda s názvom Lambda-TicketsProcessingRole:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Pre testovací príklad sú celkom vhodné predkonfigurované zásady AmazonKinesisReadOnlyAccess a AmazonDynamoDBFullAccess, ako je znázornené na obrázku nižšie:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Táto lambda by mala byť spustená spúšťačom z Kinesis, keď nové položky vstúpia do airline_stream, takže musíme pridať nový spúšťač:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Zostáva len vložiť kód a uložiť lambdu.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Vytvorenie oznamovača funkcie lambda

Podobným spôsobom je vytvorená aj druhá lambda funkcia, ktorá bude sledovať druhý stream (special_stream) a posielať notifikáciu SNS. Táto lambda teda musí mať prístup na čítanie z Kinesis a odosielanie správ na danú tému SNS, ktoré následne služba SNS rozošle všetkým predplatiteľom tejto témy (email, SMS a pod.).

Vytvorenie role IAM
Najprv pre túto lambdu vytvoríme rolu IAM Lambda-KinesisAlarm a potom túto rolu priradíme vytváranej lambda alarm_notifier:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Táto lambda by mala fungovať na spúšťači pre vstup nových záznamov do special_stream, takže spúšťač musíte nakonfigurovať rovnakým spôsobom, ako sme to urobili pre kolektor lambda.

Aby sme uľahčili konfiguráciu tejto lambdy, predstavme novú premennú prostredia – TOPIC_ARN, kde umiestnime ANR (Amazon Recourse Names) témy Airlines:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
A vložte kód lambda, nie je to vôbec zložité:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Zdá sa, že tu je dokončená manuálna konfigurácia systému. Zostáva len otestovať a uistiť sa, že sme všetko nakonfigurovali správne.

Nasadenie z kódu Terraform

Požadovaná príprava

terraform je veľmi pohodlný open-source nástroj na nasadenie infraštruktúry z kódu. Má svoju vlastnú syntax, ktorá sa dá ľahko naučiť a má veľa príkladov, ako a čo nasadiť. Editor Atom alebo Visual Studio Code má veľa praktických doplnkov, ktoré uľahčujú prácu s Terraformom.

Distribúciu si môžete stiahnuť preto. Podrobná analýza všetkých schopností Terraform je nad rámec tohto článku, takže sa obmedzíme na hlavné body.

Ako bežať

Úplný kód projektu je v mojom úložisku. Klonujeme úložisko pre seba. Pred spustením sa musíte uistiť, že máte nainštalované a nakonfigurované AWS CLI, pretože... Terraform vyhľadá prihlasovacie údaje v súbore ~/.aws/credentials.

Osvedčeným postupom je spustiť príkaz plánu pred nasadením celej infraštruktúry, aby ste videli, čo pre nás Terraform momentálne vytvára v cloude:

terraform.exe plan

Zobrazí sa výzva na zadanie telefónneho čísla, na ktoré sa majú odosielať upozornenia. V tejto fáze ho nie je potrebné zadávať.

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Po analýze operačného plánu programu môžeme začať vytvárať zdroje:

terraform.exe apply

Po odoslaní tohto príkazu budete opäť vyzvaní na zadanie telefónneho čísla; keď sa zobrazí otázka o skutočnom vykonaní akcií, vytočte „áno“. To vám umožní nastaviť celú infraštruktúru, vykonať všetky potrebné konfigurácie EC2, nasadiť lambda funkcie atď.

Po úspešnom vytvorení všetkých zdrojov pomocou kódu Terraform musíte prejsť do podrobností o aplikácii Kinesis Analytics (bohužiaľ som nenašiel, ako to urobiť priamo z kódu).

Spustite aplikáciu:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Potom musíte explicitne nastaviť názov streamu v aplikácii výberom z rozbaľovacieho zoznamu:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Teraz je všetko pripravené.

Testovanie aplikácie

Bez ohľadu na to, ako ste systém nasadili, manuálne alebo prostredníctvom kódu Terraform, bude fungovať rovnako.

Prihlásime sa cez SSH na virtuálny stroj EC2, kde je nainštalovaný Kinesis Agent a spustíme skript api_caller.py

sudo ./api_caller.py TOKEN

Jediné, čo musíte urobiť, je počkať na SMS na vaše číslo:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
SMS - správa príde na telefón takmer za 1 minútu:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera
Zostáva zistiť, či boli záznamy uložené v databáze DynamoDB pre následnú, podrobnejšiu analýzu. Tabuľka airline_tickets obsahuje približne tieto údaje:

Integrácia Aviasales API s Amazon Kinesis a jednoduchosť bez servera

Záver

V rámci vykonaných prác bol vybudovaný online systém spracovania dát založený na Amazon Kinesis. Zvažovali sa možnosti použitia Agenta Kinesis v spojení s dátovými tokmi Kinesis a analytiky Kinesis Analytics v reálnom čase pomocou príkazov SQL, ako aj interakcia Amazon Kinesis s inými službami AWS.

Vyššie uvedený systém sme nasadili dvoma spôsobmi: pomerne dlhým manuálnym a rýchlym z kódu Terraform.

K dispozícii je celý zdrojový kód projektu v mojom úložisku GitHub, navrhujem, aby ste sa s tým oboznámili.

Rád podiskutujem článok, teším sa na vaše komentáre. Dúfam v konštruktívnu kritiku.

Prajem vám úspech!

Zdroj: hab.com

Pridať komentár