Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

Pozdravljeni, Habr!

Ali radi letite z letali? Všeč mi je, a med samoizolacijo sem se zaljubil tudi v analiziranje podatkov o letalskih vozovnicah enega znanega vira - Aviasales.

Danes bomo analizirali delo Amazon Kinesis, zgradili pretočni sistem z analitiko v realnem času, namestili bazo podatkov Amazon DynamoDB NoSQL kot glavno shrambo podatkov in nastavili SMS obvestila za zanimive vstopnice.

Vsi detajli so pod krojem! Pojdi!

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

Predstavitev

Za primer potrebujemo dostop do Aviasales API. Dostop do njega je na voljo brezplačno in brez omejitev; samo registrirati se morate v razdelku »Razvijalci«, da prejmete žeton API za dostop do podatkov.

Glavni namen tega članka je podati splošno razumevanje uporabe pretakanja informacij v AWS; upoštevamo, da podatki, ki jih vrne uporabljeni API, niso strogo posodobljeni in se prenašajo iz predpomnilnika, ki je oblikovan na podlagi iskanj uporabnikov spletnih mest Aviasales.ru in Jetradar.com v zadnjih 48 urah.

Kinesis-agent, nameščen na proizvodnem stroju, prejet prek API-ja, bo samodejno razčlenil in posredoval podatke v želeni tok prek Kinesis Data Analytics. Neobdelana različica tega toka bo zapisana neposredno v trgovino. Shranjevanje neobdelanih podatkov, uvedeno v DynamoDB, bo omogočilo globljo analizo vstopnic prek orodij BI, kot je AWS Quick Sight.

Upoštevali bomo dve možnosti za postavitev celotne infrastrukture:

  • Ročno - preko AWS Management Console;
  • Infrastruktura iz kode Terraform je za lene avtomatizatorje;

Arhitektura razvitega sistema

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Uporabljene komponente:

  • Aviasales API — podatki, ki jih vrne ta API, bodo uporabljeni za vse nadaljnje delo;
  • Primerek proizvajalca EC2 — običajni virtualni stroj v oblaku, na katerem se bo generiral vhodni podatkovni tok:
    • Agent Kinesis je aplikacija Java, nameščena lokalno na napravi, ki omogoča preprost način zbiranja in pošiljanja podatkov v Kinesis (Kinesis Data Streams ali Kinesis Firehose). Agent nenehno spremlja nabor datotek v določenih imenikih in pošilja nove podatke v Kinesis;
    • API klicateljski skript — Skript Python, ki pošlje zahteve API-ju in postavi odgovor v mapo, ki jo nadzoruje Kinesis Agent;
  • Podatkovni tokovi Kinesis — storitev pretakanja podatkov v realnem času s širokimi zmožnostmi skaliranja;
  • Analitika kineze je storitev brez strežnika, ki poenostavi analizo pretočnih podatkov v realnem času. Amazon Kinesis Data Analytics konfigurira vire aplikacije in se samodejno prilagodi za obdelavo katere koli količine dohodnih podatkov;
  • AWS Lambda — storitev, ki vam omogoča zagon kode brez varnostnega kopiranja ali nastavitve strežnikov. Vsa računalniška moč se samodejno poveča za vsak klic;
  • Amazon DynamoDB - Podatkovna baza parov ključ-vrednost in dokumentov, ki zagotavlja zakasnitev manj kot 10 milisekund pri izvajanju v katerem koli obsegu. Ko uporabljate DynamoDB, vam ni treba zagotoviti, popraviti ali upravljati nobenih strežnikov. DynamoDB samodejno skalira tabele, da prilagodi količino razpoložljivih virov in ohrani visoko zmogljivost. Sistemska administracija ni potrebna;
  • Amazonska SNS - popolnoma upravljana storitev za pošiljanje sporočil po modelu izdajatelj-naročnik (Pub/Sub), s katero lahko izolirate mikrostoritve, porazdeljene sisteme in brezstrežniške aplikacije. SNS se lahko uporablja za pošiljanje informacij končnim uporabnikom prek mobilnih potisnih obvestil, sporočil SMS in e-pošte.

Začetno usposabljanje

Za posnemanje pretoka podatkov sem se odločil uporabiti informacije o letalskih vozovnicah, ki jih vrne API Aviasales. IN dokumentacijo precej obsežen seznam različnih metod, vzemimo eno od njih - "Koledar mesečnih cen", ki vrne cene za vsak dan v mesecu, razvrščene po številu prenosov. Če v zahtevku ne navedete meseca iskanja, se vrnejo podatki za mesec, ki sledi tekočemu.

Torej, registrirajmo se in pridobimo svoj žeton.

Spodaj je primer zahteve:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Zgornji način prejemanja podatkov iz API-ja z navedbo žetona v zahtevi bo deloval, vendar raje posredujem žeton dostopa skozi glavo, zato bomo to metodo uporabili v skriptu api_caller.py.

Vzorec odgovora:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Zgornji primer odgovora API-ja prikazuje vozovnico iz St. Petersburga v Phuk ... Oh, kakšne sanje ...
Ker sem iz Kazana in je Phuket zdaj "samo sanje", poiščimo karte od Sankt Peterburga do Kazana.

Predpostavlja, da že imate račun AWS. Takoj želim posebej opozoriti, da Kinesis in pošiljanje obvestil preko SMS nista vključena v letno Brezplačna stopnja (brezplačna uporaba). Toda tudi kljub temu je z nekaj dolarji v mislih povsem mogoče sestaviti predlagani sistem in se z njim igrati. In seveda ne pozabite izbrisati vseh virov, ko jih ne potrebujete več.

Na srečo bosta funkciji DynamoDb in lambda za nas brezplačni, če dosežemo svoje mesečne brezplačne omejitve. Na primer za DynamoDB: 25 GB prostora za shranjevanje, 25 WCU/RCU in 100 milijonov poizvedb. In milijon klicev lambda funkcije na mesec.

Ročna namestitev sistema

Nastavitev podatkovnih tokov Kinesis

Pojdimo v storitev Kinesis Data Streams in ustvarimo dva nova toka, en shard za vsakega.

Kaj je shard?
Shard je osnovna enota za prenos podatkov toka Amazon Kinesis. En segment zagotavlja vhodni prenos podatkov s hitrostjo 1 MB/s in izhodni prenos podatkov s hitrostjo 2 MB/s. En segment podpira do 1000 vnosov PUT na sekundo. Ko ustvarjate podatkovni tok, morate določiti potrebno število segmentov. Ustvarite lahko na primer tok podatkov z dvema segmentoma. Ta tok podatkov bo zagotavljal prenos vhodnih podatkov pri 2 MB/s in prenos izhodnih podatkov pri 4 MB/s ter podpira do 2000 zapisov PUT na sekundo.

Več kot je drobcev v vašem toku, večja je njegova prepustnost. Načeloma se tokovi skalirajo tako - z dodajanjem shardov. Toda več kot imate drobcev, višja je cena. Vsak delček stane 1,5 centa na uro in dodatnih 1.4 centa za vsak milijon enot tovora PUT.

Ustvarimo nov tok z imenom letalske_vozovnice, 1 shard mu bo dovolj:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Zdaj pa ustvarimo še eno nit z imenom poseben_tok:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

Nastavitev proizvajalca

Za analizo naloge je dovolj, da kot proizvajalec podatkov uporabite običajni primerek EC2. Ni nujno, da gre za zmogljiv in drag virtualni stroj; točkovni t2.micro bo povsem v redu.

Pomembna opomba: na primer, uporabite sliko - Amazon Linux AMI 2018.03.0, ima manj nastavitev za hiter zagon Kinesis Agenta.

Pojdite na storitev EC2, ustvarite nov virtualni stroj, izberite želeni AMI z vrsto t2.micro, ki je vključen v Free Tier:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Da bi novo ustvarjeni virtualni stroj lahko komuniciral s storitvijo Kinesis, mora imeti za to podeljene pravice. Najboljši način za to je dodelitev vloge IAM. Zato morate na zaslonu 3. korak: Konfiguracija podrobnosti primerka izbrati Ustvari novo vlogo IAM:

Ustvarjanje vloge IAM za EC2
Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
V oknu, ki se odpre, izberite, da ustvarjamo novo vlogo za EC2, in pojdite na razdelek Dovoljenja:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Z uporabo primera usposabljanja se nam ni treba spuščati v vse zapletenosti natančne konfiguracije pravic do virov, zato bomo izbrali pravilnike, ki jih je vnaprej konfiguriral Amazon: AmazonKinesisFullAccess in CloudWatchFullAccess.

Dajmo tej vlogi kakšno smiselno ime, na primer: EC2-KinesisStreams-FullAccess. Rezultat mora biti enak, kot je prikazano na spodnji sliki:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Ko ustvarite to novo vlogo, je ne pozabite priložiti ustvarjenemu primerku navideznega stroja:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Na tem zaslonu ne spremenimo ničesar drugega in se premaknemo na naslednja okna.

Nastavitve trdega diska lahko pustite kot privzete, prav tako oznake (čeprav je uporaba oznak dobra praksa, dajte primerku vsaj ime in navedite okolje).

Zdaj smo na zavihku 6. korak: Konfigurirajte varnostno skupino, kjer morate ustvariti novo ali določiti obstoječo varnostno skupino, ki vam omogoča povezavo prek ssh (vrata 22) s primerkom. Tam izberite Vir -> Moj IP in lahko zaženete primerek.

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Takoj, ko preklopi v stanje delovanja, se lahko poskusite z njim povezati prek ssh.

Za delo s Kinesis Agentom morate po uspešni povezavi z napravo v terminal vnesti naslednje ukaze:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ustvarimo mapo za shranjevanje odgovorov API-ja:

sudo mkdir /var/log/airline_tickets

Preden zaženete agenta, morate konfigurirati njegovo konfiguracijo:

sudo vim /etc/aws-kinesis/agent.json

Vsebina datoteke agent.json bi morala izgledati takole:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kot je razvidno iz konfiguracijske datoteke, bo agent spremljal datoteke s končnico .log v imeniku /var/log/airline_tickets/, jih razčlenil in prenesel v tok airline_tickets.

Ponovno zaženemo storitev in se prepričamo, da deluje in deluje:

sudo service aws-kinesis-agent restart

Zdaj pa prenesimo skript Python, ki bo zahteval podatke iz API-ja:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skript api_caller.py zahteva podatke od Aviasales in shrani prejeti odgovor v imenik, ki ga skenira agent Kinesis. Izvedba tega skripta je precej standardna, obstaja razred TicketsApi, ki vam omogoča asinhrono vlečenje API-ja. Temu razredu posredujemo glavo z žetonom in zahtevamo parametre:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Za preizkus pravilnih nastavitev in delovanja agenta poskusno zaženimo skript api_caller.py:

sudo ./api_caller.py TOKEN

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
In pogledamo rezultat dela v dnevnikih agenta in na zavihku Monitoring v podatkovnem toku airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Kot lahko vidite, vse deluje in Kinesis Agent uspešno pošilja podatke v tok. Zdaj pa konfigurirajmo potrošnika.

Nastavitev Kinesis Data Analytics

Preidimo na osrednjo komponento celotnega sistema – ustvarite novo aplikacijo v Kinesis Data Analytics z imenom kinesis_analytics_airlines_app:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Kinesis Data Analytics vam omogoča izvajanje analize podatkov v realnem času iz Kinesis Streams z uporabo jezika SQL. Je popolnoma samodejna storitev (za razliko od Kinesis Streams), ki:

  1. omogoča ustvarjanje novih tokov (Output Stream) na podlagi zahtev izvornih podatkov;
  2. zagotavlja tok z napakami, ki so se pojavile med delovanjem aplikacij (Error Stream);
  3. lahko samodejno določi shemo vhodnih podatkov (po potrebi jo lahko ročno redefiniramo).

To ni poceni storitev - 0.11 USD na uro dela, zato jo uporabljajte previdno in jo izbrišite, ko končate.

Povežimo aplikacijo z virom podatkov:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Izberite tok, s katerim se bomo povezali (letalske_vozovnice):

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Nato morate priložiti novo vlogo IAM, da lahko aplikacija bere iz toka in piše v tok. Če želite to narediti, je dovolj, da v bloku dovoljenj za dostop ne spremenite ničesar:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Zdaj pa zahtevajmo odkritje podatkovne sheme v toku; za to kliknite gumb »Odkrij shemo«. Posledično bo vloga IAM posodobljena (ustvarjena bo nova) in zaznavanje sheme se bo zagnalo iz podatkov, ki so že prispeli v tok:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Zdaj morate iti v urejevalnik SQL. Ko kliknete na ta gumb, se prikaže okno, v katerem morate zagnati aplikacijo – izberite, kaj želite zagnati:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
V okno urejevalnika SQL vstavite naslednjo preprosto poizvedbo in kliknite Shrani in zaženi SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

V relacijskih bazah podatkov delate s tabelami z uporabo stavkov INSERT za dodajanje zapisov in stavkov SELECT za poizvedovanje podatkov. V storitvi Amazon Kinesis Data Analytics delate s tokovi (STREAMs) in črpalkami (PUMPs) – neprekinjene zahteve za vstavljanje, ki vstavljajo podatke iz enega toka v aplikaciji v drug tok.

Zgoraj predstavljena poizvedba SQL išče vozovnice Aeroflota po ceni pod pet tisoč rubljev. Vsi zapisi, ki izpolnjujejo te pogoje, bodo postavljeni v tok DESTINATION_SQL_STREAM.

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
V bloku Destination izberite tok special_stream in na spustnem seznamu In-application stream name DESTINATION_SQL_STREAM:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Rezultat vseh manipulacij bi moral biti podoben spodnji sliki:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

Ustvarjanje in naročanje na temo SNS

Pojdite na Simple Notification Service in tam ustvarite novo temo z imenom Airlines:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Naročite se na to temo in navedite številko mobilnega telefona, na katero bodo poslana SMS obvestila:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

Ustvarite tabelo v DynamoDB

Za shranjevanje neobdelanih podatkov iz njihovega toka airline_tickets ustvarimo tabelo v DynamoDB z istim imenom. Kot primarni ključ bomo uporabili record_id:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

Ustvarjanje zbiralnika funkcij lambda

Ustvarimo lambda funkcijo, imenovano Collector, katere naloga bo anketirati tok airline_tickets in, če so tam najdeni novi zapisi, vstaviti te zapise v tabelo DynamoDB. Očitno mora imeti ta lambda poleg privzetih pravic dostop za branje do toka podatkov Kinesis in dostop za pisanje do DynamoDB.

Ustvarjanje vloge IAM za funkcijo zbiralnika lambda
Najprej ustvarimo novo vlogo IAM za lambdo z imenom Lambda-TicketsProcessingRole:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Za testni primer sta prednastavljena pravilnika AmazonKinesisReadOnlyAccess in AmazonDynamoDBFullAccess precej primerna, kot je prikazano na spodnji sliki:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

To lambdo bi moral zagnati sprožilec Kinesis, ko novi vnosi vstopijo v airline_stream, zato moramo dodati nov sprožilec:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Ostane le še vstavljanje kode in shranjevanje lambde.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ustvarjanje obvestila o funkciji lambda

Druga lambda funkcija, ki bo spremljala drugi tok (special_stream) in poslala obvestilo SNS, je ustvarjena na podoben način. Zato mora ta lambda imeti dostop do branja iz Kinesisa in pošiljanja sporočil v določeno temo SNS, ki jih bo nato storitev SNS poslala vsem naročnikom te teme (e-pošta, SMS itd.).

Ustvarjanje vloge IAM
Najprej ustvarimo vlogo IAM Lambda-KinesisAlarm za to lambda, nato pa to vlogo dodelimo lambda alarm_notifier, ki se ustvarja:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

Ta lambda bi morala delovati na sprožilec za vstop novih zapisov v special_stream, zato morate konfigurirati sprožilec na enak način, kot smo naredili za lambdo Collector.

Da bi olajšali konfiguracijo te lambde, uvedimo novo spremenljivko okolja - TOPIC_ARN, kamor postavimo ANR (imena virov Amazon) teme Airlines:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
In vstavite lambda kodo, sploh ni zapleteno:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Zdi se, da je tukaj ročna konfiguracija sistema končana. Vse kar ostane je, da preizkusimo in se prepričamo, da smo vse pravilno konfigurirali.

Razmestite iz kode Terraform

Potrebna priprava

Terraform je zelo priročno odprtokodno orodje za uvajanje infrastrukture iz kode. Ima lastno sintakso, ki se je enostavno naučiti, in ima veliko primerov, kako in kaj uvesti. Urejevalnik Atom ali Visual Studio Code ima veliko priročnih vtičnikov, ki olajšajo delo s Terraformom.

Distribucijo lahko prenesete zato. Podrobna analiza vseh zmogljivosti Terraform presega obseg tega članka, zato se bomo omejili na glavne točke.

Kako začeti

Celotna koda projekta je v mojem skladišču. Repozitorij kloniramo sebi. Preden začnete, se morate prepričati, da imate nameščen in konfiguriran AWS CLI, ker ... Terraform bo iskal poverilnice v datoteki ~/.aws/credentials.

Dobra praksa je, da pred uvedbo celotne infrastrukture zaženete ukaz plan, da vidite, kaj Terraform trenutno ustvarja za nas v oblaku:

terraform.exe plan

Pozvani boste, da vnesete telefonsko številko, na katero želite pošiljati obvestila. Na tej stopnji ga ni treba vnesti.

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Ko analiziramo načrt delovanja programa, lahko začnemo ustvarjati vire:

terraform.exe apply

Po pošiljanju tega ukaza boste znova pozvani, da vnesete telefonsko številko; izberite »da«, ko se prikaže vprašanje o dejanskem izvajanju dejanj. To vam bo omogočilo nastavitev celotne infrastrukture, izvedbo vseh potrebnih konfiguracij EC2, uvedbo lambda funkcij itd.

Ko so vsi viri uspešno ustvarjeni s kodo Terraform, se morate poglobiti v podrobnosti aplikacije Kinesis Analytics (žal nisem našel, kako to narediti neposredno iz kode).

Zaženite aplikacijo:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Po tem morate izrecno nastaviti ime toka v aplikaciji tako, da izberete na spustnem seznamu:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Zdaj je vse pripravljeno za odhod.

Testiranje aplikacije

Ne glede na to, kako ste uvedli sistem, ročno ali prek kode Terraform, bo deloval enako.

Preko SSH se prijavimo na virtualni stroj EC2, kjer je nameščen Kinesis Agent in zaženemo skripto api_caller.py

sudo ./api_caller.py TOKEN

Vse kar morate storiti je, da počakate na SMS na vašo številko:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
SMS - sporočilo prispe na vaš telefon v skoraj 1 minuti:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Videti je treba, ali so bili zapisi shranjeni v bazi podatkov DynamoDB za kasnejšo, podrobnejšo analizo. Tabela airline_tickets vsebuje približno naslednje podatke:

Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika

Zaključek

V okviru opravljenega dela je bil zgrajen sistem za spletno obdelavo podatkov, ki temelji na Amazon Kinesis. Upoštevane so bile možnosti za uporabo Kinesis Agenta v povezavi s Kinesis Data Streams in analitiko Kinesis Analytics v realnem času z uporabo ukazov SQL ter interakcija Amazon Kinesis z drugimi storitvami AWS.

Zgornji sistem smo uvedli na dva načina: precej dolgega ročnega in hitrega iz kode Terraform.

Na voljo je vsa izvorna koda projekta v mojem repozitoriju GitHub, predlagam, da se z njim seznanite.

Z veseljem razpravljam o članku, veselim se vaših komentarjev. Upam na konstruktivno kritiko.

Želim vam uspeh!

Vir: www.habr.com

Dodaj komentar