Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

Hej Habra!

Czy lubisz latać samolotami? Uwielbiam to, ale podczas samoizolacji zakochałam się też w analizowaniu danych o biletach lotniczych z jednego znanego zasobu – Aviasales.

Dziś przeanalizujemy pracę Amazon Kinesis, zbudujemy system streamingowy z analityką w czasie rzeczywistym, zainstalujemy bazę danych Amazon DynamoDB NoSQL jako główny magazyn danych oraz skonfigurujemy powiadomienia SMS o ciekawych biletach.

Wszystkie szczegóły znajdują się pod wycięciem! Iść!

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

Wprowadzenie

Na przykład potrzebujemy dostępu do API Aviasales. Dostęp do niego jest bezpłatny i bez ograniczeń, wystarczy zarejestrować się w sekcji „Programiści”, aby otrzymać token API umożliwiający dostęp do danych.

Głównym celem tego artykułu jest przedstawienie ogólnego zrozumienia stosowania strumieniowania informacji w AWS; bierzemy pod uwagę, że dane zwracane przez wykorzystywane API nie są ściśle aktualne i są przesyłane z pamięci podręcznej, która jest utworzone na podstawie wyszukiwań przeprowadzonych przez użytkowników witryn Aviasales.ru i Jetradar.com w ciągu ostatnich 48 godzin.

Kinesis-agent zainstalowany na maszynie produkcyjnej, odebrany przez API, automatycznie przeanalizuje i prześle dane do żądanego strumienia poprzez Kinesis Data Analytics. Surowa wersja tego strumienia zostanie zapisana bezpośrednio w sklepie. Magazyn surowych danych wdrożony w DynamoDB umożliwi głębszą analizę zgłoszeń za pomocą narzędzi BI, takich jak AWS Quick Sight.

Rozważymy dwie opcje wdrożenia całej infrastruktury:

  • Ręcznie – poprzez Konsolę Zarządzającą AWS;
  • Infrastruktura z kodu Terraform jest dla leniwych automatów;

Architektura opracowanego systemu

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Wykorzystane komponenty:

  • API Aviasales — dane zwrócone przez to API zostaną wykorzystane do wszystkich późniejszych prac;
  • Instancja producenta EC2 — zwykła maszyna wirtualna w chmurze, na której będzie generowany strumień danych wejściowych:
    • Agent Kinesis to aplikacja Java instalowana lokalnie na komputerze, która umożliwia łatwe gromadzenie i wysyłanie danych do Kinesis (Kinesis Data Streams lub Kinesis Firehose). Agent stale monitoruje zestaw plików we wskazanych katalogach i wysyła nowe dane do Kinesis;
    • Skrypt wywołujący API — Skrypt w języku Python wysyłający żądania do interfejsu API i umieszczający odpowiedź w folderze monitorowanym przez agenta Kinesis;
  • Strumienie danych kinezy — usługa strumieniowego przesyłania danych w czasie rzeczywistym z szerokimi możliwościami skalowania;
  • Analityka kinezy to usługa bezserwerowa, która upraszcza analizę danych przesyłanych strumieniowo w czasie rzeczywistym. Amazon Kinesis Data Analytics konfiguruje zasoby aplikacji i automatycznie skaluje się, aby obsłużyć dowolną ilość przychodzących danych;
  • AWS Lambda — usługa umożliwiająca uruchomienie kodu bez tworzenia kopii zapasowych i konfigurowania serwerów. Cała moc obliczeniowa jest automatycznie skalowana dla każdego połączenia;
  • Amazon DynamoDB - Baza danych par klucz-wartość i dokumentów zapewniająca opóźnienie mniejsze niż 10 milisekund podczas działania w dowolnej skali. Korzystając z DynamoDB, nie trzeba udostępniać, instalować poprawek ani zarządzać żadnymi serwerami. DynamoDB automatycznie skaluje tabele, aby dostosować ilość dostępnych zasobów i zachować wysoką wydajność. Nie jest wymagana administracja systemem;
  • Amazon SNS - w pełni zarządzana usługa wysyłania wiadomości w modelu wydawca-subskrybent (Pub/Sub), dzięki której można izolować mikroserwisy, systemy rozproszone i aplikacje bezserwerowe. SNS może służyć do wysyłania informacji do użytkowników końcowych za pośrednictwem mobilnych powiadomień push, wiadomości SMS i e-maili.

Wstępny trening

Aby emulować przepływ danych, zdecydowałem się wykorzystać informacje o bilecie lotniczym zwrócone przez API Aviasales. W dokumentacja dość obszerna lista różnych metod, weźmy jedną z nich - „Kalendarz cen miesięcznych”, który zwraca ceny na każdy dzień miesiąca, pogrupowane według liczby przelewów. Jeżeli w zapytaniu nie określisz miesiąca wyszukiwania, zwrócona zostanie informacja za miesiąc następujący po bieżącym.

Zarejestrujmy się więc i zdobądźmy nasz token.

Przykładowe żądanie znajduje się poniżej:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Powyższa metoda odbioru danych z API poprzez podanie tokena w żądaniu będzie działać, jednak ja wolę przekazywać token dostępu przez nagłówek, dlatego skorzystamy z tej metody w skrypcie api_caller.py.

Przykład odpowiedzi:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Przykładowa odpowiedź API powyżej pokazuje bilet z Petersburga do Phuk... Och, co za sen...
Jako że jestem z Kazania, a Phuket to już „tylko marzenie”, rozglądajmy się za biletami z Petersburga do Kazania.

Zakłada się, że masz już konto AWS. Pragnę od razu zwrócić szczególną uwagę na fakt, że Kinesis i wysyłanie powiadomień SMS-em nie są wliczane do rocznika Poziom bezpłatny (bezpłatne korzystanie). Ale nawet pomimo tego, mając na uwadze kilka dolarów, całkiem możliwe jest zbudowanie proponowanego systemu i zabawa nim. I oczywiście nie zapomnij usunąć wszystkich zasobów, gdy nie będą już potrzebne.

Na szczęście funkcje DynamoDb i lambda będą dla nas darmowe, jeśli wykorzystamy nasze miesięczne darmowe limity. Na przykład dla DynamoDB: 25 GB pamięci, 25 WCU/RCU i 100 milionów zapytań. I milion wywołań funkcji lambda miesięcznie.

Ręczne wdrożenie systemu

Konfigurowanie strumieni danych Kinesis

Przejdźmy do usługi Kinesis Data Streams i utwórzmy dwa nowe strumienie, po jednym fragmencie dla każdego.

Co to jest odłamek?
Shard to podstawowa jednostka przesyłania danych w strumieniu Amazon Kinesis. Jeden segment zapewnia przesyłanie danych wejściowych z szybkością 1 MB/s i przesyłanie danych wyjściowych z szybkością 2 MB/s. Jeden segment obsługuje do 1000 wpisów PUT na sekundę. Tworząc strumień danych, należy określić wymaganą liczbę segmentów. Można na przykład utworzyć strumień danych składający się z dwóch segmentów. Ten strumień danych zapewni transfer danych wejściowych z szybkością 2 MB/s i transfer danych wyjściowych z szybkością 4 MB/s, obsługując do 2000 rekordów PUT na sekundę.

Im więcej fragmentów w strumieniu, tym większa jego przepustowość. W zasadzie tak właśnie skaluje się przepływy - poprzez dodawanie fragmentów. Ale im więcej masz odłamków, tym wyższa cena. Każdy odłamek kosztuje 1,5 centa za godzinę i dodatkowe 1.4 centa za każdy milion jednostek ładunku PUT.

Utwórzmy nowy strumień o nazwie bilety lotnicze, wystarczy mu 1 odłamek:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Utwórzmy teraz kolejny wątek o nazwie specjalny_strumień:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

Konfiguracja producenta

Do analizy zadania wystarczy wykorzystać zwykłą instancję EC2 jako producenta danych. Nie musi to być potężna i droga maszyna wirtualna – spot t2.micro będzie w zupełności wystarczający.

Ważna uwaga: na przykład powinieneś użyć obrazu - Amazon Linux AMI 2018.03.0, ma on mniej ustawień do szybkiego uruchamiania agenta Kinesis.

Przejdź do usługi EC2, utwórz nową maszynę wirtualną, wybierz żądany AMI z typem t2.micro, który jest zawarty w warstwie bezpłatnej:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Aby nowo utworzona maszyna wirtualna mogła wchodzić w interakcję z usługą Kinesis, należy jej nadać do tego uprawnienia. Najlepszym sposobem na osiągnięcie tego jest przypisanie roli IAM. Dlatego na ekranie Krok 3: Skonfiguruj szczegóły instancji należy wybrać Utwórz nową rolę IAM:

Tworzenie roli IAM dla EC2
Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
W oknie, które zostanie otwarte zaznacz, że tworzymy nową rolę dla EC2 i przejdź do sekcji Uprawnienia:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Korzystając z przykładu szkoleniowego, nie musimy wnikać we wszystkie zawiłości szczegółowej konfiguracji praw do zasobów, dlatego wybierzemy polityki wstępnie skonfigurowane przez Amazon: AmazonKinesisFullAccess i CloudWatchFullAccess.

Nadajmy tej roli jakąś sensowną nazwę, na przykład: EC2-KinesisStreams-FullAccess. Wynik powinien być taki sam, jak pokazano na poniższym obrazku:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Po utworzeniu tej nowej roli nie zapomnij dołączyć jej do utworzonej instancji maszyny wirtualnej:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Nic więcej nie zmieniamy na tym ekranie i przechodzimy do kolejnych okien.

Ustawienia dysku twardego można pozostawić domyślne, podobnie jak tagi (chociaż dobrą praktyką jest używanie tagów, przynajmniej nadaj instancji nazwę i wskaż środowisko).

Teraz jesteśmy na karcie Krok 6: Skonfiguruj grupę zabezpieczeń, gdzie należy utworzyć nową lub określić istniejącą grupę zabezpieczeń, która umożliwia połączenie się przez ssh (port 22) z instancją. Wybierz tam Źródło -> Moje IP i możesz uruchomić instancję.

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Gdy tylko przejdzie w stan uruchomienia, możesz spróbować połączyć się z nim przez ssh.

Aby móc pracować z Kinesis Agentem, po pomyślnym połączeniu z maszyną należy wpisać w terminalu następujące polecenia:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Utwórzmy folder do zapisywania odpowiedzi API:

sudo mkdir /var/log/airline_tickets

Przed uruchomieniem agenta należy skonfigurować jego konfigurację:

sudo vim /etc/aws-kinesis/agent.json

Zawartość pliku agent.json powinna wyglądać następująco:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Jak widać z pliku konfiguracyjnego agent będzie monitorował pliki z rozszerzeniem .log w katalogu /var/log/airline_tickets/, analizował je i przesyłał do strumienia Airlines_tickets.

Ponownie uruchamiamy usługę i upewniamy się, że działa:

sudo service aws-kinesis-agent restart

Pobierzmy teraz skrypt Pythona, który będzie żądał danych z API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skrypt api_caller.py żąda danych od Aviasales i zapisuje otrzymaną odpowiedź w katalogu skanowanym przez agenta Kinesis. Implementacja tego skryptu jest dość standardowa, istnieje klasa TicketsApi, która pozwala na asynchroniczne ściąganie API. Do tej klasy przekazujemy nagłówek z tokenem i parametry żądania:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Aby przetestować poprawność ustawień i funkcjonalność agenta, przetestujmy uruchomienie skryptu api_caller.py:

sudo ./api_caller.py TOKEN

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
I na wynik pracy patrzymy w logach Agenta oraz na zakładce Monitoring w strumieniu danych Airlines_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Jak widać wszystko działa i Kinesis Agent pomyślnie wysyła dane do strumienia. Teraz skonfigurujmy konsumenta.

Konfigurowanie analizy danych Kinesis

Przejdźmy do centralnego komponentu całego systemu - utwórz w Kinesis Data Analytics nową aplikację o nazwie kinesis_analytics_airlines_app:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Kinesis Data Analytics umożliwia analizę danych w czasie rzeczywistym ze strumieni Kinesis przy użyciu języka SQL. Jest to usługa w pełni autoskalująca (w przeciwieństwie do Kinesis Streams), która:

  1. umożliwia tworzenie nowych strumieni (Output Stream) na podstawie żądań danych źródłowych;
  2. udostępnia strumień błędów, które wystąpiły podczas działania aplikacji (Error Stream);
  3. może automatycznie określić schemat danych wejściowych (w razie potrzeby można go ręcznie przedefiniować).

Nie jest to tania usługa - 0.11 USD za godzinę pracy, dlatego należy z niej korzystać ostrożnie i usuwać ją po zakończeniu.

Podłączmy aplikację do źródła danych:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Wybierz strumień, z którym będziemy się łączyć (airline_tickets):

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Następnie musisz dołączyć nową rolę IAM, aby aplikacja mogła czytać ze strumienia i zapisywać w strumieniu. Aby to zrobić, wystarczy nie zmieniać niczego w bloku Uprawnienia dostępu:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Teraz poprośmy o odkrycie schematu danych w strumieniu, w tym celu należy kliknąć przycisk „Odkryj schemat”. W rezultacie rola IAM zostanie zaktualizowana (utworzona zostanie nowa) i uruchomiona zostanie detekcja schematu z danych, które już nadeszły w strumieniu:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Teraz musisz przejść do edytora SQL. Po kliknięciu tego przycisku pojawi się okno z prośbą o uruchomienie aplikacji - wybierz co chcesz uruchomić:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Wstaw następujące proste zapytanie do okna edytora SQL i kliknij Zapisz i uruchom SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

W relacyjnych bazach danych do pracy z tabelami używa się instrukcji INSERT w celu dodawania rekordów oraz instrukcji SELECT w celu wysyłania zapytań o dane. W Amazon Kinesis Data Analytics pracujesz ze strumieniami (STREAM) i pompami (PUMP) — ciągłymi żądaniami wstawiania, które wstawiają dane z jednego strumienia w aplikacji do innego strumienia.

Przedstawione powyżej zapytanie SQL wyszukuje bilety Aeroflotu w cenie poniżej pięciu tysięcy rubli. Wszystkie rekordy spełniające te warunki zostaną umieszczone w strumieniu DESTINATION_SQL_STREAM.

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
W bloku Destination wybierz strumień special_stream, a w rozwijanej liście Nazwa strumienia w aplikacji DESTINATION_SQL_STREAM:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Wynik wszystkich manipulacji powinien być podobny do poniższego obrazka:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

Tworzenie i subskrybowanie tematu SNS

Przejdź do usługi prostego powiadamiania i utwórz tam nowy temat o nazwie Linie lotnicze:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Zapisz się do tego tematu i podaj numer telefonu komórkowego, na który będą wysyłane powiadomienia SMS:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

Utwórz tabelę w DynamoDB

Aby przechowywać surowe dane ze strumienia Airlines_tickets, utwórzmy tabelę w DynamoDB o tej samej nazwie. Użyjemy record_id jako klucza podstawowego:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

Tworzenie kolektora funkcji lambda

Stwórzmy funkcję lambda o nazwie Collector, której zadaniem będzie odpytywanie strumienia Airlines_tickets i w przypadku znalezienia w nim nowych rekordów wstawienie tych rekordów do tabeli DynamoDB. Oczywiście oprócz praw domyślnych ta lambda musi mieć dostęp do odczytu strumienia danych Kinesis i dostępu do zapisu w DynamoDB.

Tworzenie roli IAM dla funkcji lambda zbierającej
Najpierw utwórzmy nową rolę IAM dla lambdy o nazwie Lambda-TicketsProcessingRole:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
W przykładzie testowym całkiem odpowiednie są wstępnie skonfigurowane zasady AmazonKinesisReadOnlyAccess i AmazonDynamoDBFullAccess, jak pokazano na poniższym obrazku:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

Ta lambda powinna zostać uruchomiona przez wyzwalacz z Kinesis, gdy nowe wpisy dotrą do linii lotniczej, dlatego musimy dodać nowy wyzwalacz:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Pozostaje tylko wstawić kod i zapisać lambdę.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Tworzenie powiadamiacza funkcji lambda

W podobny sposób tworzy się drugą funkcję lambda, która będzie monitorować drugi strumień (special_stream) i wysyłać powiadomienie do SNS. Zatem ta lambda musi mieć dostęp do odczytu z Kinesis i wysyłania wiadomości na dany temat SNS, które następnie zostaną rozesłane przez usługę SNS do wszystkich abonentów tego tematu (e-mail, SMS itp.).

Tworzenie roli IAM
Najpierw tworzymy rolę IAM Lambda-KinesisAlarm dla tej lambdy, a następnie przypisujemy tę rolę do tworzonej lambdy alarm_notifier:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

Ta lambda powinna działać na wyzwalaczu dla nowych rekordów, aby wejść do specjalnego strumienia, więc musisz skonfigurować wyzwalacz w taki sam sposób, jak zrobiliśmy to w przypadku lambdy Kolekcjonera.

Aby ułatwić konfigurację tej lambdy, wprowadźmy nową zmienną środowiskową - TOPIC_ARN, w której umieszczamy ANR (Amazon Recourse Names) tematu Airlines:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
I wstaw kod lambda, to wcale nie jest skomplikowane:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Wygląda na to, że na tym ręczna konfiguracja systemu została zakończona. Pozostaje tylko przetestować i upewnić się, że wszystko poprawnie skonfigurowaliśmy.

Wdróż z kodu Terraform

Wymagane przygotowanie

Terraform to bardzo wygodne narzędzie typu open source do wdrażania infrastruktury z kodu. Ma własną składnię, która jest łatwa do nauczenia i zawiera wiele przykładów tego, jak i co wdrożyć. Edytor Atom lub Visual Studio Code posiada wiele przydatnych wtyczek, które ułatwiają pracę z Terraformem.

Możesz pobrać dystrybucję stąd. Szczegółowa analiza wszystkich możliwości Terraforma wykracza poza zakres tego artykułu, dlatego ograniczymy się do głównych punktów.

Jak zaczac

Pełny kod projektu to w moim repozytorium. Klonujemy repozytorium dla siebie. Przed rozpoczęciem upewnij się, że masz zainstalowany i skonfigurowany interfejs AWS CLI, ponieważ... Terraform będzie szukać danych uwierzytelniających w pliku ~/.aws/credentials.

Dobrą praktyką jest uruchomienie komendy plan przed wdrożeniem całej infrastruktury, aby zobaczyć, co Terraform aktualnie dla nas tworzy w chmurze:

terraform.exe plan

Zostaniesz poproszony o podanie numeru telefonu, na który chcesz wysyłać powiadomienia. Na tym etapie nie jest konieczne wprowadzanie go.

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Po przeanalizowaniu planu działania programu możemy przystąpić do tworzenia zasobów:

terraform.exe apply

Po wysłaniu tego polecenia ponownie zostaniesz poproszony o podanie numeru telefonu; wybierz „tak”, gdy pojawi się pytanie o faktyczne wykonanie czynności. Umożliwi to skonfigurowanie całej infrastruktury, przeprowadzenie całej niezbędnej konfiguracji EC2, wdrożenie funkcji lambda itp.

Po pomyślnym utworzeniu wszystkich zasobów za pomocą kodu Terraform należy wejść w szczegóły aplikacji Kinesis Analytics (niestety nie znalazłem jak to zrobić bezpośrednio z kodu).

Uruchom aplikację:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Następnie musisz jawnie ustawić nazwę strumienia w aplikacji, wybierając z listy rozwijanej:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Teraz wszystko jest gotowe do pracy.

Testowanie aplikacji

Niezależnie od tego, jak wdrożyłeś system, ręcznie czy za pomocą kodu Terraform, będzie on działał tak samo.

Logujemy się przez SSH do maszyny wirtualnej EC2, na której zainstalowany jest Kinesis Agent i uruchamiamy skrypt api_caller.py

sudo ./api_caller.py TOKEN

Wystarczy, że poczekasz na wiadomość SMS na Twój numer:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
SMS – wiadomość przychodzi na telefon już po niemal 1 minucie:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa
Pozostaje sprawdzić, czy rekordy zostały zapisane w bazie DynamoDB w celu późniejszej, bardziej szczegółowej analizy. Tabela Airlines_tickets zawiera w przybliżeniu następujące dane:

Integracja API Aviasales z Amazon Kinesis i prostota bezserwerowa

wniosek

W ramach prowadzonych prac zbudowano system przetwarzania danych on-line w oparciu o Amazon Kinesis. Rozważono opcje wykorzystania Kinesis Agent w połączeniu z Kinesis Data Streams i analityką w czasie rzeczywistym Kinesis Analytics za pomocą poleceń SQL, a także interakcję Amazon Kinesis z innymi usługami AWS.

Powyższy system wdrożyliśmy na dwa sposoby: dość długi, ręczny i szybki z kodu Terraform.

Dostępny jest cały kod źródłowy projektu w moim repozytorium GitHub, radzę się z tym zapoznać.

Chętnie podejmę dyskusję na temat artykułu, czekam na Wasze komentarze. Liczę na konstruktywną krytykę.

Życzę powodzenia!

Źródło: www.habr.com

Dodaj komentarz