Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

Szia Habr!

Szeretsz repülni? Imádom, de az önizoláció során beleszerettem a repülőjegyek adatainak elemzésébe is egy jól ismert forrásból - az Aviasalesből.

Ma elemezzük az Amazon Kinesis munkáját, valós idejű elemzéssel streaming rendszert építünk, fő adattárolóként telepítjük az Amazon DynamoDB NoSQL adatbázist, és beállítunk SMS értesítéseket az érdekes jegyekről.

Minden részlet a vágás alatt! Megy!

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

Bevezetés

A példa esetében hozzáférésre van szükségünk Aviasales API. Hozzáférés ingyenes és korlátozások nélkül biztosított; csak regisztrálnia kell a „Fejlesztők” részben, hogy megkapja az API tokenjét az adatok eléréséhez.

Ennek a cikknek az a fő célja, hogy általános megértést adjon az információs adatfolyam AWS-ben történő használatáról; figyelembe vesszük, hogy a használt API által visszaküldött adatok nem szigorúan naprakészek, és a gyorsítótárból kerülnek továbbításra, amely az Aviasales.ru és a Jetradar.com webhelyek felhasználói által az elmúlt 48 órában végzett keresések alapján jött létre.

A gyártó gépre telepített, az API-n keresztül kapott Kinesis-agent automatikusan elemzi és továbbítja az adatokat a kívánt adatfolyamhoz a Kinesis Data Analytics segítségével. Ennek az adatfolyamnak a nyers verziója közvetlenül az áruházba lesz írva. A DynamoDB-ben telepített nyers adattárolás mélyebb jegyelemzést tesz lehetővé BI-eszközökön, például az AWS Quick Sighton keresztül.

Két lehetőséget fogunk mérlegelni a teljes infrastruktúra kiépítésére:

  • Kézi - az AWS Management Console-on keresztül;
  • A Terraform kódból származó infrastruktúra a lusta automatáknak való;

A kidolgozott rendszer felépítése

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Felhasznált komponensek:

  • Aviasales API — az ezen API által visszaadott adatokat minden további munkához felhasználjuk;
  • EC2 gyártói példány — egy szokásos virtuális gép a felhőben, amelyen a bemeneti adatfolyam generálásra kerül:
    • Kinezis ügynök egy helyileg a gépre telepített Java alkalmazás, amely egyszerű módot biztosít az adatok összegyűjtésére és küldésére a Kinesisnek (Kinesis Data Streams vagy Kinesis Firehose). Az ügynök folyamatosan figyeli a megadott könyvtárakban található fájlkészletet, és új adatokat küld a Kinesisnek;
    • API Caller Script — Python-szkript, amely kéréseket küld az API-nak, és a választ egy mappába helyezi, amelyet a Kinesis Agent figyel;
  • Kinesis adatfolyamok — valós idejű adatfolyam szolgáltatás széles skálázási lehetőségekkel;
  • Kinesis Analytics egy szerver nélküli szolgáltatás, amely leegyszerűsíti a streaming adatok valós idejű elemzését. Az Amazon Kinesis Data Analytics konfigurálja az alkalmazás-erőforrásokat, és automatikusan skálázja a bejövő adatmennyiség kezelésére;
  • AWS Lambda — olyan szolgáltatás, amely lehetővé teszi kód futtatását biztonsági mentés vagy kiszolgálók beállítása nélkül. Az összes számítási teljesítmény automatikusan skálázódik minden híváshoz;
  • Amazon DynamoDB - Kulcs-érték párokat és dokumentumokat tartalmazó adatbázis, amely 10 ezredmásodpercnél kisebb késleltetést biztosít bármilyen léptékű futás esetén. A DynamoDB használatakor nem kell kiépíteni, javítani vagy kezelni egyetlen kiszolgálót sem. A DynamoDB automatikusan méretezi a táblákat a rendelkezésre álló erőforrások mennyiségének beállításához és a magas teljesítmény fenntartásához. Nincs szükség rendszeradminisztrációra;
  • Amazon SNS - egy teljes körűen felügyelt szolgáltatás a kiadó-előfizető (Pub/Sub) modellt használó üzenetküldéshez, amellyel mikroszolgáltatásokat, elosztott rendszereket és szerver nélküli alkalmazásokat izolálhat. Az SNS segítségével információkat küldhetünk a végfelhasználóknak mobil push értesítéseken, SMS-eken és e-maileken keresztül.

Kezdeti képzés

Az adatfolyam emulálásához úgy döntöttem, hogy az Aviasales API által visszaadott repülőjegy-információkat használom. BAN BEN dokumentáció a különböző módszerek meglehetősen kiterjedt listája, vegyünk ezek közül egyet - a „Havi árnaptárat”, amely a hónap minden napjára visszaadja az árakat, az átutalások száma szerint csoportosítva. Ha a kérésben nem adja meg a keresési hónapot, az aktuális hónapot követő hónapra vonatkozó információkat adjuk vissza.

Tehát regisztráljunk, és szerezzük be a tokenünket.

Alább látható egy példa kérés:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

A fenti módszer az API-tól való adatok fogadására a kérésben egy token megadásával működik, de én inkább a fejlécen keresztül szeretném átadni a hozzáférési tokent, ezért ezt a módszert fogjuk használni az api_caller.py szkriptben.

Válasz példa:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

A fenti példa API-válasz egy Szentpétervárról Phukba tartó jegyet mutat... Ó, micsoda álom...
Mivel én kazanyi vagyok, Phuket pedig „csak álom”, keressünk jegyeket Szentpétervárról Kazanyba.

Feltételezi, hogy már rendelkezik AWS-fiókkal. Azonnal külön felhívom a figyelmet arra, hogy a Kinesis és az SMS-ben történő értesítések küldése nem szerepel a évi Ingyenes szint (ingyenes használat). De még ennek ellenére is, néhány dollárt szem előtt tartva, teljesen lehetséges a javasolt rendszer felépítése és játszani vele. És természetesen ne felejtse el törölni az összes erőforrást, miután már nincs rájuk szükség.

Szerencsére a DynamoDb és a lambda funkciók ingyenesek lesznek számunkra, ha elérjük a havi ingyenes limiteket. Például a DynamoDB esetében: 25 GB tárhely, 25 WCU/RCU és 100 millió lekérdezés. És egymillió lambda függvényhívás havonta.

Kézi rendszertelepítés

Kinesis adatfolyamok beállítása

Lépjünk a Kinesis Data Streams szolgáltatáshoz, és hozzunk létre két új adatfolyamot, mindegyikhez egy-egy szilánkot.

Mi az a szilánk?
A szilánk az Amazon Kinesis adatfolyam alapvető adatátviteli egysége. Az egyik szegmens 1 MB/s-os bemeneti adatátvitelt, 2 MB/s-os kimeneti adatátvitelt biztosít. Egy szegmens akár 1000 PUT bejegyzést támogat másodpercenként. Adatfolyam létrehozásakor meg kell adni a szükséges számú szegmenst. Létrehozhat például két szegmensből álló adatfolyamot. Ez az adatfolyam 2 MB/s-os bemeneti adatátvitelt, 4 MB/s-os kimeneti adatátvitelt biztosít, akár 2000 PUT-rekordot támogat másodpercenként.

Minél több szilánk van az adatfolyamban, annál nagyobb az átviteli sebessége. Elvileg így skálázódnak az áramlások – szilánkok hozzáadásával. De minél több szilánkja van, annál magasabb az ára. Minden egyes szilánk ára 1,5 cent óránként, és további 1.4 cent minden millió PUT rakományegység után.

Hozzunk létre egy új adatfolyamot a névvel repülőjegyek, 1 szilánk is elég lesz neki:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Most hozzunk létre egy másik szálat a névvel speciális_folyam:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

Termelői beállítás

Egy feladat elemzéséhez elegendő egy normál EC2 példányt használni adatelőállítóként. Nem kell egy nagy teljesítményű, drága virtuális gépnek lennie; egy spot t2.micro is jól működik.

Fontos megjegyzés: például az Amazon Linux AMI 2018.03.0 image-t érdemes használnia, amely kevesebb beállítást tartalmaz a Kinesis Agent gyors elindításához.

Lépjen az EC2 szolgáltatásba, hozzon létre egy új virtuális gépet, válassza ki a kívánt AMI-t t2.micro típussal, amely a Free Tier része:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Ahhoz, hogy az újonnan létrehozott virtuális gép kapcsolatba tudjon lépni a Kinesis szolgáltatással, ehhez jogokat kell adni neki. Ennek legjobb módja egy IAM-szerep hozzárendelése. Ezért a 3. lépés: Példány részleteinek konfigurálása képernyőn válassza ki a lehetőséget Hozzon létre új IAM-szerepet:

IAM-szerep létrehozása az EC2 számára
Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
A megnyíló ablakban válassza ki, hogy új szerepkört hozunk létre az EC2 számára, és lépjen az Engedélyek szakaszba:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
A képzési példát használva nem kell belemennünk az erőforrásjogok részletes beállításának minden bonyolultságába, ezért az Amazon által előre konfigurált házirendeket fogjuk kiválasztani: AmazonKinesisFullAccess és CloudWatchFullAccess.

Adjunk valami értelmes nevet ennek a szerepnek, például: EC2-KinesisStreams-FullAccess. Az eredménynek meg kell egyeznie az alábbi képen láthatóval:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Az új szerepkör létrehozása után ne felejtse el csatolni a létrehozott virtuálisgép-példányhoz:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Semmi mást nem változtatunk ezen a képernyőn, és továbblépünk a következő ablakokra.

A merevlemez beállításait meg lehet hagyni alapértelmezettként, csakúgy, mint a címkéket (bár célszerű a címkék használata, de legalább nevet adjon a példánynak és jelezze a környezetet).

Most a 6. lépés: Biztonsági csoport konfigurálása lapon vagyunk, ahol létre kell hoznia egy újat, vagy meg kell adnia a meglévő biztonsági csoportot, amely lehetővé teszi, hogy ssh-n keresztül (22-es port) csatlakozzon a példányhoz. Válassza a Forrás -> Saját IP-címet, és elindíthatja a példányt.

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Amint futó állapotba vált, megpróbálhatsz csatlakozni hozzá ssh-n keresztül.

A Kinesis Agent használatához a géphez való sikeres csatlakozás után a következő parancsokat kell beírnia a terminálba:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Hozzon létre egy mappát az API-válaszok mentéséhez:

sudo mkdir /var/log/airline_tickets

Az ügynök elindítása előtt konfigurálnia kell a konfigurációját:

sudo vim /etc/aws-kinesis/agent.json

Az agent.json fájl tartalmának így kell kinéznie:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Amint a konfigurációs fájlból látható, az ügynök figyeli a .log kiterjesztésű fájlokat a /var/log/airline_tickets/ könyvtárban, elemzi és átviszi az airline_tickets adatfolyamba.

Újraindítjuk a szolgáltatást, és megbizonyosodunk arról, hogy működőképes:

sudo service aws-kinesis-agent restart

Most töltsük le a Python-szkriptet, amely adatokat kér az API-tól:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Az api_caller.py szkript adatokat kér az Aviasalestől, és elmenti a kapott választ abba a könyvtárba, amelyet a Kinesis ügynök vizsgál. Ennek a szkriptnek a megvalósítása meglehetősen szabványos, van egy TicketsApi osztály, amely lehetővé teszi az API aszinkron húzását. Ennek az osztálynak átadunk egy fejlécet egy tokent, és paramétereket kérünk:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Az ügynök megfelelő beállításainak és működésének teszteléséhez teszteljük az api_caller.py szkriptet:

sudo ./api_caller.py TOKEN

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
És megnézzük a munka eredményét az Ügynöknaplókban és a Monitoring fülön a airline_tickets adatfolyamban:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Amint látja, minden működik, és a Kinesis Agent sikeresen küldi az adatokat a streamnek. Most konfiguráljuk a fogyasztót.

A Kinesis Data Analytics beállítása

Térjünk át a teljes rendszer központi elemére – hozzunk létre egy új alkalmazást a Kinesis Data Analyticsben kinesis_analytics_airlines_app néven:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
A Kinesis Data Analytics lehetővé teszi, hogy valós idejű adatelemzést végezzen a Kinesis Streamsből az SQL nyelv használatával. Ez egy teljesen automatikus skálázó szolgáltatás (ellentétben a Kinesis Streams-szel), amely:

  1. lehetővé teszi új adatfolyamok (Output Stream) létrehozását a forrásadatokra vonatkozó kérések alapján;
  2. adatfolyamot biztosít az alkalmazások futása közben fellépő hibákkal (Hibafolyam);
  3. automatikusan meghatározhatja a bemeneti adatsémát (szükség esetén manuálisan újradefiniálható).

Ez nem olcsó szolgáltatás – 0.11 USD munkaóránként, ezért óvatosan használja, és törölje, ha végzett.

Csatlakoztassuk az alkalmazást az adatforráshoz:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Válassza ki azt a streamet, amelyhez csatlakozni fogunk (airline_tickets):

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Ezután csatolnia kell egy új IAM-szerepkört, hogy az alkalmazás olvasni tudjon az adatfolyamból, és írni tudjon az adatfolyamba. Ehhez elég, ha nem módosít semmit a hozzáférési engedélyek blokkban:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Most kérjük az adatfolyamban lévő adatséma felfedezését; ehhez kattintson a „Séma felfedezése” gombra. Ennek eredményeként az IAM szerepkör frissül (újat hoz létre), és elindul a séma észlelése a streambe már érkezett adatokból:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Most az SQL-szerkesztőbe kell lépnie. Amikor erre a gombra kattint, megjelenik egy ablak, amely arra kéri, hogy indítsa el az alkalmazást - válassza ki, mit szeretne elindítani:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Illessze be a következő egyszerű lekérdezést az SQL-szerkesztő ablakába, majd kattintson az SQL mentése és futtatása gombra:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

A relációs adatbázisokban a táblákkal dolgozhat INSERT utasításokkal rekordok hozzáadásához és SELECT utasítással az adatok lekérdezéséhez. Az Amazon Kinesis Data Analytics szolgáltatásban adatfolyamokkal (STREAM-okkal) és pumpákkal (PUMP-okkal) dolgozik – olyan folyamatos beszúrási kérelmekkel, amelyek az alkalmazás egyik adatfolyamából egy másik adatfolyamba szúrnak be adatokat.

A fent bemutatott SQL lekérdezés ötezer rubel alatti áron keres Aeroflot jegyeket. Minden olyan rekord, amely megfelel ezeknek a feltételeknek, a DESTINATION_SQL_STREAM adatfolyamba kerül.

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
A Cél blokkban válassza ki a special_stream adatfolyamot, és az Alkalmazáson belüli adatfolyam neve DESTINATION_SQL_STREAM legördülő listában:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Az összes manipuláció eredményének az alábbi képhez hasonlónak kell lennie:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

SNS téma létrehozása és előfizetés

Nyissa meg az Egyszerű értesítési szolgáltatást, és hozzon létre egy új témát légitársaságok néven:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Iratkozzon fel erre a témára, és adja meg a mobiltelefonszámot, amelyre az SMS-értesítéseket küldi:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

Hozzon létre egy táblázatot a DynamoDB-ben

A légitársaságok_jegyek adatfolyamából származó nyers adatok tárolásához hozzunk létre egy azonos nevű táblázatot a DynamoDB-ben. A record_id elsődleges kulcsot használjuk:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

Lambda függvénygyűjtő készítése

Hozzunk létre egy lambda függvényt Collector néven, aminek az lesz a feladata, hogy lekérdezze az airline_tickets adatfolyamot, és ha ott új rekordok találhatók, beszúrjuk ezeket a rekordokat a DynamoDB táblába. Nyilvánvaló, hogy az alapértelmezett jogokon kívül ennek a lambdának olvasási hozzáféréssel kell rendelkeznie a Kinesis adatfolyamhoz és írási hozzáféréssel a DynamoDB-hez.

IAM szerepkör létrehozása a kollektor lambda függvényhez
Először is hozzunk létre egy új IAM-szerepet a lambdához Lambda-TicketsProcessingRole néven:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
A tesztpéldához az előre konfigurált AmazonKinesisReadOnlyAccess és AmazonDynamoDBFullAccess házirendek nagyon megfelelőek, amint az az alábbi képen látható:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

Ezt a lambdát a Kinesis triggerrel kell elindítani, amikor új bejegyzések lépnek be az airline_streambe, ezért új triggert kell hozzáadnunk:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Már csak a kód beírása és a lambda mentése van hátra.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda funkcióértesítő létrehozása

Hasonló módon jön létre a második lambda funkció, amely a második adatfolyamot figyeli (special_stream) és értesítést küld az SNS-nek. Ezért ennek a lambdának hozzáféréssel kell rendelkeznie a Kinesisből való olvasáshoz és üzenetek küldéséhez egy adott SNS-témához, amelyeket aztán az SNS-szolgáltatás elküld a téma minden előfizetőjének (e-mail, SMS stb.).

IAM-szerep létrehozása
Először létrehozzuk a Lambda-KinesisAlarm IAM szerepkört ehhez a lambdához, majd hozzárendeljük ezt a szerepet a létrehozandó alarm_notifier lambdához:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

Ennek a lambdának egy triggeren kell működnie, hogy új rekordok lépjenek be a special_streambe, ezért a triggert ugyanúgy kell beállítani, mint a Collector lambdánál.

A lambda konfigurálásának megkönnyítése érdekében vezessünk be egy új környezeti változót - TOPIC_ARN, ahol a légitársaságok témakör ANR-jét (Amazon Recourse Names) helyezzük el:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
És illessze be a lambda kódot, ez egyáltalán nem bonyolult:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Úgy tűnik, hogy itt fejeződött be a kézi rendszerkonfiguráció. Nincs más hátra, mint tesztelni, és megbizonyosodni arról, hogy mindent megfelelően konfiguráltunk.

Telepítés a Terraform kódból

Szükséges előkészület

Terraform egy nagyon kényelmes nyílt forráskódú eszköz az infrastruktúra kódból történő telepítéséhez. Saját szintaxisa van, amely könnyen megtanulható, és számos példája van arra, hogyan és mit kell telepíteni. Az Atom szerkesztő vagy a Visual Studio Code számos praktikus bővítményt tartalmaz, amelyek megkönnyítik a Terraformmal való munkát.

Letöltheti a disztribúciót ezért. A Terraform összes képességének részletes elemzése túlmutat e cikk keretein, ezért a főbb pontokra szorítkozunk.

Hogyan kezdjük

A projekt teljes kódja a az én tárhelyemben. Az adattárat magunkra klónozzuk. Mielőtt elkezdené, győződjön meg arról, hogy az AWS CLI telepítve és konfigurálva van, mert... A Terraform a hitelesítő adatokat a ~/.aws/credentials fájlban keresi.

Jó gyakorlat az, hogy a terv parancsot a teljes infrastruktúra üzembe helyezése előtt futtatjuk, hogy megnézzük, mit hoz létre a Terraform jelenleg a felhőben:

terraform.exe plan

Meg kell adnia egy telefonszámot, amelyre értesítéseket küldhet. Ebben a szakaszban nem szükséges megadni.

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
A program működési tervének elemzése után megkezdhetjük az erőforrások létrehozását:

terraform.exe apply

A parancs elküldése után ismét meg kell adnia egy telefonszámot; tárcsázza az „igen”-t, ha a műveletek tényleges végrehajtására vonatkozó kérdés jelenik meg. Ez lehetővé teszi a teljes infrastruktúra beállítását, az EC2 összes szükséges konfigurációjának elvégzését, a lambda funkciók telepítését stb.

Miután az összes erőforrást sikeresen létrehozta a Terraform kódon keresztül, be kell mennie a Kinesis Analytics alkalmazás részleteibe (sajnos közvetlenül a kódból nem találtam meg, hogyan kell ezt megtenni).

Indítsa el az alkalmazást:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Ezt követően kifejezetten be kell állítania az alkalmazáson belüli adatfolyam nevét a legördülő listából:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Most már minden készen áll.

Az alkalmazás tesztelése

Függetlenül attól, hogy hogyan telepítette a rendszert, manuálisan vagy Terraform kódon keresztül, ugyanúgy fog működni.

SSH-n keresztül bejelentkezünk arra az EC2 virtuális gépre, amelyre a Kinesis Agent telepítve van, és futtatjuk az api_caller.py szkriptet

sudo ./api_caller.py TOKEN

Nincs más dolgod, mint várni egy SMS-t a számodra:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
SMS - majdnem 1 percen belül üzenet érkezik a telefonra:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség
Meg kell nézni, hogy a rekordokat elmentették-e a DynamoDB adatbázisba későbbi, részletesebb elemzés céljából. A repülőjegyek táblázat körülbelül a következő adatokat tartalmazza:

Aviasales API integráció az Amazon Kinesis-szel és szerver nélküli egyszerűség

Következtetés

Az elvégzett munka során egy Amazon Kinesis alapú online adatfeldolgozó rendszer épült ki. Megfontolásra kerültek a Kinesis Agent és a Kinesis Data Streamek és a valós idejű elemzések Kinesis Analytics SQL-parancsok használatával történő használatának lehetőségei, valamint az Amazon Kinesis más AWS-szolgáltatásokkal való interakciója.

A fenti rendszert kétféleképpen telepítettük: egy meglehetősen hosszú kézi és egy gyors Terraform kódból.

A projekt összes forráskódja elérhető a GitHub adattáramban, azt javaslom, hogy ismerkedjen meg vele.

Szívesen megvitatom a cikket, várom észrevételeiket. Építő kritikát remélek.

Sok sikert kívánok!

Forrás: will.com

Hozzászólás