Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

Čau Habr!

Máte rádi létající letadla? Miluji to, ale během sebeizolace jsem se také zamiloval do analýzy dat o letenkách z jednoho známého zdroje - Aviasales.

Dnes si rozebereme práci Amazon Kinesis, postavíme streamovací systém s analytikou v reálném čase, nainstalujeme databázi Amazon DynamoDB NoSQL jako hlavní úložiště dat a nastavíme SMS upozornění na zajímavé vstupenky.

Všechny detaily jsou pod střihem! Jít!

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

úvod

Například potřebujeme přístup k Aviasales API. Přístup k němu je poskytován zdarma a bez omezení, stačí se zaregistrovat v sekci „Vývojáři“, abyste získali svůj API token pro přístup k datům.

Hlavním účelem tohoto článku je poskytnout obecné porozumění použití streamování informací v AWS; bereme v úvahu, že data vrácená použitým API nejsou striktně aktuální a jsou přenášena z mezipaměti, která je vytvořené na základě vyhledávání uživatelů stránek Aviasales.ru a Jetradar.com za posledních 48 hodin.

Kinesis-agent nainstalovaný na produkčním stroji, přijatý přes API, automaticky analyzuje a přenese data do požadovaného proudu pomocí Kinesis Data Analytics. Surová verze tohoto streamu bude zapsána přímo do obchodu. Úložiště nezpracovaných dat nasazené v DynamoDB umožní hlubší analýzu lístků prostřednictvím nástrojů BI, jako je AWS Quick Sight.

Zvážíme dvě možnosti nasazení celé infrastruktury:

  • Manuální - přes AWS Management Console;
  • Infrastruktura z kódu Terraform je pro líné automaty;

Architektura vyvíjeného systému

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Použité komponenty:

  • Aviasales API — data vrácená tímto API budou použita pro veškerou následnou práci;
  • Instance výrobce EC2 — běžný virtuální stroj v cloudu, na kterém bude generován vstupní datový tok:
    • Kinesis Agent je aplikace Java nainstalovaná lokálně na stroji, která poskytuje snadný způsob shromažďování a odesílání dat do Kinesis (Kinesis Data Streams nebo Kinesis Firehose). Agent neustále monitoruje sadu souborů v určených adresářích a posílá nová data do Kinesis;
    • API volající skript — Python skript, který odesílá požadavky na API a vkládá odpověď do složky, která je monitorována agentem Kinesis;
  • Datové toky Kinesis — služba streamování dat v reálném čase se širokými možnostmi škálování;
  • Kinesis Analytics je služba bez serveru, která zjednodušuje analýzu streamovaných dat v reálném čase. Amazon Kinesis Data Analytics konfiguruje prostředky aplikace a automaticky se škáluje tak, aby zpracovávala jakýkoli objem příchozích dat;
  • AWS Lambda — služba, která vám umožňuje spouštět kód bez zálohování nebo nastavování serverů. Veškerý výpočetní výkon je automaticky škálován pro každý hovor;
  • Amazon DynamoDB - Databáze párů klíč-hodnota a dokumentů, která poskytuje latenci menší než 10 milisekund při běhu v libovolném měřítku. Při používání DynamoDB nemusíte zajišťovat, opravovat ani spravovat žádné servery. DynamoDB automaticky škáluje tabulky tak, aby upravovaly množství dostupných zdrojů a udržovaly vysoký výkon. Není vyžadována žádná správa systému;
  • Amazon SNS - plně spravovaná služba pro odesílání zpráv pomocí modelu vydavatel-odběratel (Pub/Sub), pomocí které můžete izolovat mikroslužby, distribuované systémy a aplikace bez serveru. SNS lze použít k odesílání informací koncovým uživatelům prostřednictvím mobilních push notifikací, SMS zpráv a e-mailů.

Počáteční školení

K emulaci datového toku jsem se rozhodl použít informace o letenkách vrácené Aviasales API. V dokumentace poměrně rozsáhlý seznam různých metod, vezměme si jednu z nich - „Měsíční cenový kalendář“, který vrací ceny za každý den v měsíci, seskupené podle počtu převodů. Pokud v žádosti neuvedete měsíc hledání, budou vráceny informace za měsíc následující po aktuálním měsíci.

Tak se registrujte a získejte náš token.

Příklad žádosti je níže:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Výše uvedený způsob příjmu dat z API zadáním tokenu v požadavku bude fungovat, ale já preferuji předání přístupového tokenu přes hlavičku, takže tuto metodu použijeme ve skriptu api_caller.py.

Příklad odpovědi:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Příklad odpovědi API výše ukazuje letenku z Petrohradu do Phuku... Ach, jaký sen...
Protože jsem z Kazaně a Phuket je nyní „jen sen“, hledejme letenky z Petrohradu do Kazaně.

Předpokládá se, že již máte účet AWS. Okamžitě upozorňuji především na to, že v roční nejsou zahrnuty Kinesis a zasílání notifikací pomocí SMS Bezplatná úroveň (bezplatné použití). Ale i přes to, s pár dolary na mysli, je docela možné sestavit navrhovaný systém a hrát si s ním. A samozřejmě nezapomeňte smazat všechny zdroje poté, co již nejsou potřeba.

Naštěstí funkce DynamoDb a lambda pro nás budou zdarma, pokud splníme naše měsíční bezplatné limity. Například pro DynamoDB: 25 GB úložiště, 25 WCU/RCU a 100 milionů dotazů. A milion volání funkce lambda za měsíc.

Ruční nasazení systému

Nastavení datových toků Kinesis

Pojďme do služby Kinesis Data Streams a vytvořte dva nové streamy, pro každý jeden shard.

Co je to střep?
Shard je základní jednotka přenosu dat streamu Amazon Kinesis. Jeden segment zajišťuje přenos vstupních dat rychlostí 1 MB/s a přenos výstupních dat rychlostí 2 MB/s. Jeden segment podporuje až 1000 vstupů PUT za sekundu. Při vytváření datového toku je třeba zadat požadovaný počet segmentů. Můžete například vytvořit datový stream se dvěma segmenty. Tento datový tok zajistí přenos vstupních dat rychlostí 2 MB/s a přenos výstupních dat rychlostí 4 MB/s s podporou až 2000 záznamů PUT za sekundu.

Čím více fragmentů ve vašem streamu, tím větší je jeho propustnost. V principu se toky škálují – přidáváním střepů. Ale čím více střepů máte, tím vyšší je cena. Každý úlomek stojí 1,5 centu za hodinu a dalších 1.4 centu za každý milion jednotek užitečného zatížení PUT.

Vytvořme nový stream s názvem letenky, bude mu stačit 1 střep:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Nyní vytvoříme další vlákno s názvem speciální_stream:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

Nastavení výrobce

K analýze úlohy stačí jako producent dat použít běžnou instanci EC2. Nemusí to být výkonný, drahý virtuální stroj, spot t2.micro postačí.

Důležitá poznámka: měli byste například použít image - Amazon Linux AMI 2018.03.0, má méně nastavení pro rychlé spuštění Kinesis Agent.

Přejděte do služby EC2, vytvořte nový virtuální stroj, vyberte požadovaný AMI typu t2.micro, který je součástí bezplatné úrovně:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Aby mohl nově vytvořený virtuální stroj komunikovat se službou Kinesis, musí k tomu mít oprávnění. Nejlepší způsob, jak toho dosáhnout, je přiřadit roli IAM. Na obrazovce Krok 3: Konfigurace podrobností instance byste proto měli vybrat Vytvořte novou roli IAM:

Vytvoření role IAM pro EC2
Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
V okně, které se otevře, vyberte, že vytváříme novou roli pro EC2, a přejděte do části Oprávnění:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Na příkladu školení nemusíme zacházet do všech složitostí podrobné konfigurace práv ke zdrojům, takže vybereme zásady předem nakonfigurované Amazonem: AmazonKinesisFullAccess a CloudWatchFullAccess.

Uveďme pro tuto roli nějaký smysluplný název, například: EC2-KinesisStreams-FullAccess. Výsledek by měl být stejný jako na obrázku níže:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Po vytvoření této nové role ji nezapomeňte připojit k vytvořené instanci virtuálního počítače:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Na této obrazovce již nic neměníme a přecházíme do dalších oken.

Nastavení pevného disku lze ponechat jako výchozí, stejně jako značky (ačkoli je dobré značky používat, alespoň instanci pojmenujte a uveďte prostředí).

Nyní jsme na kartě Krok 6: Konfigurace skupiny zabezpečení, kde je třeba vytvořit novou nebo zadat stávající skupinu zabezpečení, která vám umožní připojit se přes ssh (port 22) k instanci. Zde vyberte Zdroj -> Moje IP a můžete spustit instanci.

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Jakmile se přepne do spuštěného stavu, můžete se k němu pokusit připojit přes ssh.

Abyste mohli pracovat s Kinesis Agent, musíte po úspěšném připojení ke stroji zadat do terminálu následující příkazy:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Vytvořme složku pro ukládání odpovědí API:

sudo mkdir /var/log/airline_tickets

Před spuštěním agenta musíte nakonfigurovat jeho konfiguraci:

sudo vim /etc/aws-kinesis/agent.json

Obsah souboru agent.json by měl vypadat takto:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Jak je vidět z konfiguračního souboru, agent bude sledovat soubory s příponou .log v adresáři /var/log/airline_tickets/, analyzovat je a přenášet do streamu airline_tickets.

Restartujeme službu a ujistíme se, že je v provozu:

sudo service aws-kinesis-agent restart

Nyní si stáhneme skript Python, který bude vyžadovat data z API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skript api_caller.py požaduje data od Aviasales a přijatou odpověď uloží do adresáře, který agent Kinesis prohledá. Implementace tohoto skriptu je celkem standardní, existuje třída TicketsApi, umožňuje asynchronně tahat API. Této třídě předáme hlavičku s tokenem a parametry požadavku:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Chcete-li otestovat správné nastavení a funkčnost agenta, otestujte spuštění skriptu api_caller.py:

sudo ./api_caller.py TOKEN

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
A podíváme se na výsledek práce v protokolech agenta a na záložce Monitoring v datovém toku airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Jak vidíte, vše funguje a Kinesis Agent úspěšně odesílá data do streamu. Nyní nakonfigurujeme spotřebitele.

Nastavení analýzy dat Kinesis

Pojďme k centrální součásti celého systému – vytvořte novou aplikaci v Kinesis Data Analytics s názvem kinesis_analytics_airlines_app:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Kinesis Data Analytics vám umožňuje provádět analýzu dat v reálném čase z Kinesis Streams pomocí jazyka SQL. Jedná se o službu plně automatického škálování (na rozdíl od Kinesis Streams), která:

  1. umožňuje vytvářet nové streamy (Output Stream) na základě požadavků na zdrojová data;
  2. poskytuje stream s chybami, ke kterým došlo během běhu aplikací (Error Stream);
  3. dokáže automaticky určit schéma vstupních dat (v případě potřeby jej lze ručně předefinovat).

Nejedná se o levnou službu – 0.11 USD za hodinu práce, takže byste ji měli používat opatrně a po dokončení ji smazat.

Připojíme aplikaci ke zdroji dat:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Vyberte stream, ke kterému se připojíme (letenky):

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Dále je třeba připojit novou roli IAM, aby aplikace mohla číst ze streamu a zapisovat do streamu. K tomu stačí nic neměnit v bloku Přístupová oprávnění:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Nyní požádáme o zjištění schématu dat ve streamu; k tomu klikněte na tlačítko „Discover schema“. V důsledku toho bude aktualizována role IAM (vytvoří se nová) a bude spuštěna detekce schématu z dat, která již dorazila do streamu:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Nyní musíte přejít do editoru SQL. Po kliknutí na toto tlačítko se zobrazí okno s výzvou ke spuštění aplikace – vyberte, co chcete spustit:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Vložte následující jednoduchý dotaz do okna editoru SQL a klikněte na Uložit a spustit SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

V relačních databázích pracujete s tabulkami pomocí příkazů INSERT k přidávání záznamů a příkazu SELECT k dotazování na data. V Amazon Kinesis Data Analytics pracujete se streamy (STREAMY) a pumpami (PUMP) – kontinuálními požadavky na vkládání, které vkládají data z jednoho streamu v aplikaci do jiného streamu.

SQL dotaz uvedený výše vyhledává letenky Aeroflot za cenu nižší než pět tisíc rublů. Všechny záznamy, které splňují tyto podmínky, budou umístěny do streamu DESTINATION_SQL_STREAM.

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
V bloku Cíl vyberte stream special_stream a v rozevíracím seznamu název streamu v aplikaci DESTINATION_SQL_STREAM:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Výsledkem všech manipulací by mělo být něco podobného jako na obrázku níže:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

Vytvoření a přihlášení k odběru tématu SNS

Přejděte na službu Simple Notification Service a vytvořte zde nové téma s názvem Airlines:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Přihlaste se k odběru tohoto tématu a uveďte číslo mobilního telefonu, na které budou zasílána upozornění SMS:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

Vytvořte tabulku v DynamoDB

Chcete-li uložit nezpracovaná data z jejich streamu airline_tickets, vytvořte tabulku v DynamoDB se stejným názvem. Jako primární klíč použijeme record_id:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

Vytvoření kolektoru lambda funkce

Vytvořme lambda funkci s názvem Collector, jejímž úkolem bude dotazovat stream airline_tickets a pokud se tam najdou nové záznamy, vložit tyto záznamy do tabulky DynamoDB. Je zřejmé, že kromě výchozích práv musí mít tato lambda přístup pro čtení k datovému toku Kinesis a přístup pro zápis do DynamoDB.

Vytvoření role IAM pro funkci kolektoru lambda
Nejprve vytvořte novou roli IAM pro lambda s názvem Lambda-TicketsProcessingRole:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Pro testovací příklad jsou docela vhodné předkonfigurované zásady AmazonKinesisReadOnlyAccess a AmazonDynamoDBFullAccess, jak ukazuje obrázek níže:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

Tato lambda by měla být spuštěna spouštěčem z Kinesis, když do airline_streamu vstoupí nové položky, takže musíme přidat nový spouštěč:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Zbývá pouze vložit kód a uložit lambdu.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Vytvoření oznamovače funkce lambda

Podobným způsobem je vytvořena i druhá lambda funkce, která bude sledovat druhý stream (special_stream) a odesílat upozornění na SNS. Tato lambda tedy musí mít přístup ke čtení z Kinesis a odesílání zpráv na dané téma SNS, které pak služba SNS rozesílá všem odběratelům tohoto tématu (email, SMS atd.).

Vytvoření role IAM
Nejprve pro tuto lambdu vytvoříme roli IAM Lambda-KinesisAlarm a poté tuto roli přiřadíme vytvářené lambda alarm_notifier:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

Tato lambda by měla fungovat na spouštěči pro vstup nových záznamů do special_stream, takže musíte spouštěč nakonfigurovat stejným způsobem, jako jsme to udělali pro lambdu Collector.

Abychom usnadnili konfiguraci této lambdy, zavedeme novou proměnnou prostředí – TOPIC_ARN, kam umístíme ANR (Amazon Recourse Names) tématu Airlines:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
A vložte kód lambda, není to vůbec složité:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Zdá se, že zde je ruční konfigurace systému dokončena. Zbývá jen otestovat a ujistit se, že jsme vše správně nakonfigurovali.

Nasazení z kódu Terraform

Nezbytná příprava

Terraform je velmi pohodlný open-source nástroj pro nasazení infrastruktury z kódu. Má svou vlastní syntaxi, kterou je snadné se naučit a obsahuje mnoho příkladů, jak a co nasadit. Editor Atom nebo Visual Studio Code má mnoho praktických pluginů, které usnadňují práci s Terraformem.

Distribuci si můžete stáhnout proto. Podrobná analýza všech schopností Terraformu přesahuje rámec tohoto článku, omezíme se proto na hlavní body.

Jak běžet

Úplný kód projektu je v mém úložišti. Klonujeme úložiště pro sebe. Než začnete, musíte se ujistit, že máte nainstalované a nakonfigurované AWS CLI, protože... Terraform bude hledat přihlašovací údaje v souboru ~/.aws/credentials.

Osvědčeným postupem je spustit příkaz plan před nasazením celé infrastruktury, abyste viděli, co pro nás Terraform aktuálně vytváří v cloudu:

terraform.exe plan

Budete vyzváni k zadání telefonního čísla, na které chcete zasílat upozornění. V této fázi není nutné jej zadávat.

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Po analýze provozního plánu programu můžeme začít vytvářet zdroje:

terraform.exe apply

Po odeslání tohoto příkazu budete opět vyzváni k zadání telefonního čísla, při zobrazení dotazu na skutečné provedení akcí vytočte „ano“. To vám umožní nastavit celou infrastrukturu, provést veškerou potřebnou konfiguraci EC2, nasadit lambda funkce atd.

Poté, co byly všechny zdroje úspěšně vytvořeny pomocí kódu Terraform, musíte jít do podrobností o aplikaci Kinesis Analytics (bohužel jsem nenašel, jak to udělat přímo z kódu).

Spusťte aplikaci:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Poté musíte explicitně nastavit název streamu v aplikaci výběrem z rozevíracího seznamu:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Nyní je vše připraveno.

Testování aplikace

Bez ohledu na to, jak jste systém nasadili, ručně nebo prostřednictvím kódu Terraform, bude fungovat stejně.

Přihlásíme se přes SSH na virtuální stroj EC2, kde je nainstalován Kinesis Agent a spustíme skript api_caller.py

sudo ./api_caller.py TOKEN

Jediné, co musíte udělat, je počkat na SMS na vaše číslo:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
SMS - zpráva dorazí na telefon téměř za 1 minutu:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru
Zbývá zjistit, zda byly záznamy uloženy v databázi DynamoDB pro následnou podrobnější analýzu. Tabulka airline_tickets obsahuje přibližně následující údaje:

Integrace Aviasales API s Amazon Kinesis a jednoduchost bez serveru

Závěr

V průběhu prací byl vybudován online systém zpracování dat na bázi Amazon Kinesis. Byly zváženy možnosti použití Kinesis Agent ve spojení s Kinesis Data Streams a analýzou v reálném čase Kinesis Analytics pomocí příkazů SQL, stejně jako interakce Amazon Kinesis s dalšími službami AWS.

Výše uvedený systém jsme nasadili dvěma způsoby: poměrně dlouhým manuálním a rychlým z kódu Terraform.

K dispozici je veškerý zdrojový kód projektu v mém úložišti GitHub, doporučuji, abyste se s tím seznámili.

Rád k článku diskutuji, těším se na vaše komentáře. Doufám v konstruktivní kritiku.

Přeji vám úspěch!

Zdroj: www.habr.com

Přidat komentář