Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

Hei Habr!

Pidätkö lentokoneiden lentämisestä? Rakastan sitä, mutta itseeristyksen aikana rakastuin myös lentolipputietojen analysointiin yhdestä tunnetusta lähteestä - Aviasalesista.

Tänään analysoimme Amazon Kinesiksen työtä, rakennamme suoratoistojärjestelmän reaaliaikaisen analytiikan avulla, asennamme Amazon DynamoDB NoSQL -tietokannan tärkeimmäksi tietovarastoksi ja asetamme tekstiviesti-ilmoitukset mielenkiintoisille lipuille.

Kaikki yksityiskohdat ovat leikkauksen alla! Mennä!

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

Esittely

Esimerkiksi tarvitsemme pääsyn Aviasales API. Pääsy siihen tarjotaan maksutta ja rajoituksetta; sinun tarvitsee vain rekisteröityä "Kehittäjät" -osioon saadaksesi API-tunnuksesi, jotta voit käyttää tietoja.

Tämän artikkelin päätarkoituksena on antaa yleinen käsitys tiedon suoratoiston käytöstä AWS:ssä; otamme huomioon, että käytetyn API:n palauttamat tiedot eivät ole täysin ajan tasalla ja ne välitetään välimuistista, joka on muodostettu Aviasales.ru- ja Jetradar.com-sivustojen käyttäjien viimeisten 48 tunnin aikana tekemien hakujen perusteella.

Tuotantokoneeseen asennettu, API:n kautta vastaanotettu Kinesis-agentti jäsentää ja lähettää tiedot automaattisesti haluttuun tietovirtaan Kinesis Data Analyticsin kautta. Tämän streamin raakaversio kirjoitetaan suoraan kauppaan. DynamoDB:ssä käyttöön otettu raakatietojen tallennus mahdollistaa syvemmän lippuanalyysin BI-työkalujen, kuten AWS Quick Sightin, avulla.

Harkitsemme kahta vaihtoehtoa koko infrastruktuurin käyttöönottamiseksi:

  • Manuaalinen - AWS-hallintakonsolin kautta;
  • Terraform-koodin infrastruktuuri on laiskoja automatisoijia varten;

Kehitetyn järjestelmän arkkitehtuuri

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Käytetyt komponentit:

  • Aviasales API — tämän API:n palauttamia tietoja käytetään kaikkeen myöhempään työhön;
  • EC2-tuottajainstanssi — tavallinen virtuaalikone pilvessä, johon syötetietovirta luodaan:
    • Kinesis-agentti on koneelle paikallisesti asennettu Java-sovellus, joka tarjoaa helpon tavan kerätä ja lähettää tietoja Kinesikseen (Kinesis Data Streams tai Kinesis Firehose). Agentti tarkkailee jatkuvasti tiedostojoukkoa määritetyissä hakemistoissa ja lähettää uusia tietoja Kinesisille;
    • API Caller Script — Python-skripti, joka tekee pyyntöjä API:lle ja sijoittaa vastauksen kansioon, jota Kinesis Agent valvoo;
  • Kinesis-tietovirrat — reaaliaikainen datan suoratoistopalvelu, jossa on laajat skaalausominaisuudet;
  • Kinesis Analytics on palvelimeton palvelu, joka yksinkertaistaa suoratoistodatan analysointia reaaliajassa. Amazon Kinesis Data Analytics konfiguroi sovellusresurssit ja skaalautuu automaattisesti käsittelemään mitä tahansa saapuvaa dataa;
  • AWS Lambda — palvelu, jonka avulla voit suorittaa koodia ilman varmuuskopiointia tai palvelimien asentamista. Kaikki laskentateho skaalataan automaattisesti jokaiselle puhelulle;
  • Amazon DynamoDB - Tietokanta avainarvopareista ja asiakirjoista, jonka latenssi on alle 10 millisekuntia ajettaessa missä tahansa mittakaavassa. Kun käytät DynamoDB:tä, sinun ei tarvitse valmistaa, korjata tai hallita palvelimia. DynamoDB skaalaa taulukot automaattisesti säätääkseen käytettävissä olevien resurssien määrää ja ylläpitääkseen korkeaa suorituskykyä. Järjestelmänhallintaa ei vaadita;
  • Amazon SNS - täysin hallittu palvelu viestien lähettämiseen julkaisija-tilaaja (Pub/Sub) -mallilla, jolla voit eristää mikropalvelut, hajautetut järjestelmät ja palvelimettomat sovellukset. SNS-palvelua voidaan käyttää tietojen lähettämiseen loppukäyttäjille mobiili push-ilmoitusten, tekstiviestien ja sähköpostien kautta.

Alkukoulutus

Tietovirran emulointiin päätin käyttää Aviasales API:n palauttamia lentolipputietoja. SISÄÄN dokumentointi melko laaja luettelo eri tavoista, otetaan yksi niistä - "Kuukausihintakalenteri", joka palauttaa hinnat kullekin kuukauden päivälle ryhmiteltynä siirtojen lukumäärän mukaan. Jos et ilmoita pyynnössä hakukuukautta, tiedot palautetaan kuluvaa kuukautta seuraavalta kuukaudelta.

Joten rekisteröidytään ja hankitaan tunnus.

Esimerkkipyyntö on alla:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Yllä oleva tapa vastaanottaa tietoja API:lta määrittämällä pyyntöön tunnuksen toimii, mutta välitän mieluummin pääsytunnuksen otsikon kautta, joten käytämme tätä menetelmää api_caller.py-komentosarjassa.

Vastausesimerkki:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Yllä oleva esimerkki API-vastauksesta näyttää lipun Pietarista Phukiin... Voi mikä unelma...
Koska olen Kazanista ja Phuket on nyt "vain unelma", katsotaanpa lippuja Pietarista Kazaniin.

Se olettaa, että sinulla on jo AWS-tili. Haluan välittömästi kiinnittää erityistä huomiota siihen, että Kinesis ja ilmoitusten lähettäminen tekstiviestillä eivät sisälly vuosimaksuun. Ilmainen taso (ilmainen käyttö). Mutta vaikka tästä huolimatta, pari dollaria mielessä, on täysin mahdollista rakentaa ehdotettu järjestelmä ja pelata sillä. Ja tietysti älä unohda poistaa kaikkia resursseja, kun niitä ei enää tarvita.

Onneksi DynamoDb- ja lambda-toiminnot ovat meille ilmaisia, jos saavutamme kuukausittaiset ilmaisrajat. Esimerkiksi DynamoDB:lle: 25 Gt tallennustilaa, 25 WCU/RCU ja 100 miljoonaa kyselyä. Ja miljoona lambda-toimintokutsua kuukaudessa.

Manuaalinen järjestelmän käyttöönotto

Kinesis-tietovirtojen määrittäminen

Siirrytään Kinesis Data Streams -palveluun ja luodaan kaksi uutta striimiä, yksi sirpale kullekin.

Mikä on sirpale?
Sirpale on Amazon Kinesis -virran perustiedonsiirtoyksikkö. Yksi segmentti tarjoaa tulotiedonsiirron nopeudella 1 MB/s ja lähtötiedonsiirron nopeudella 2 MB/s. Yksi segmentti tukee jopa 1000 PUT-merkintää sekunnissa. Kun luot tietovirtaa, sinun on määritettävä tarvittava määrä segmenttejä. Voit esimerkiksi luoda tietovirran, jossa on kaksi segmenttiä. Tämä tietovirta tarjoaa tulotiedonsiirron nopeudella 2 Mt/s ja lähtötiedonsiirron nopeudella 4 Mt/s, ja se tukee jopa 2000 PUT-tietuetta sekunnissa.

Mitä enemmän sirpaleita streamissasi on, sitä suurempi on sen suorituskyky. Periaatteessa näin virtaukset skaalataan - lisäämällä sirpaleita. Mutta mitä enemmän sirpaleita sinulla on, sitä korkeampi hinta. Jokainen sirpale maksaa 1,5 senttiä tunnissa ja lisäksi 1.4 senttiä jokaisesta miljoonasta PUT-hyötykuormayksiköstä.

Luodaan uusi stream, jolla on nimi lentoliput, 1 sirpale riittää hänelle:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Luodaan nyt toinen ketju, jolla on nimi erityinen_virta:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

Tuottajan asetukset

Tehtävän analysointiin riittää käyttää tavallista EC2-ilmentymää tiedontuottajana. Sen ei tarvitse olla tehokas ja kallis virtuaalikone, vaan paikalla oleva t2.micro käy mainiosti.

Tärkeä huomautus: käytä esimerkiksi kuvaa - Amazon Linux AMI 2018.03.0, siinä on vähemmän asetuksia Kinesis Agentin nopeaan käynnistämiseen.

Mene EC2-palveluun, luo uusi virtuaalikone, valitse haluamasi AMI tyypillä t2.micro, joka sisältyy Free Tier -tasoon:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Jotta juuri luotu virtuaalikone voisi olla vuorovaikutuksessa Kinesis-palvelun kanssa, sille on annettava siihen oikeudet. Paras tapa tehdä tämä on määrittää IAM-rooli. Siksi Vaihe 3: Määritä ilmentymän tiedot -näytössä sinun tulee valita Luo uusi IAM-rooli:

IAM-roolin luominen EC2:lle
Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Valitse avautuvasta ikkunasta, että luomme uutta roolia EC2:lle ja siirry Käyttöoikeudet-osioon:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Harjoitteluesimerkin avulla meidän ei tarvitse perehtyä kaikkiin resurssioikeuksien yksityiskohtaisen konfiguroinnin monimutkaisuuteen, joten valitsemme Amazonin ennalta määrittämät käytännöt: AmazonKinesisFullAccess ja CloudWatchFullAccess.

Annetaan tälle roolille jokin merkityksellinen nimi, esimerkiksi: EC2-KinesisStreams-FullAccess. Tuloksen pitäisi olla sama kuin alla olevassa kuvassa:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Kun olet luonut tämän uuden roolin, älä unohda liittää se luotuun virtuaalikoneen esiintymään:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Emme muuta tällä näytöllä mitään muuta ja siirrymme seuraaviin ikkunoihin.

Kiintolevyn asetukset voidaan jättää oletusasetuksiksi, samoin kuin tagit (joskin tunnisteita on hyvä käyttää, anna ilmentymälle ainakin nimi ja ilmaista ympäristö).

Nyt olemme Vaihe 6: Määritä suojausryhmä -välilehti, jossa sinun on luotava uusi tai määritettävä olemassa oleva suojausryhmä, jonka avulla voit muodostaa yhteyden ssh:n (portin 22) kautta ilmentymään. Valitse siellä Lähde -> Oma IP ja voit käynnistää ilmentymän.

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Heti kun se siirtyy käynnissä olevaan tilaan, voit yrittää muodostaa yhteyden siihen ssh:n kautta.

Jotta voit työskennellä Kinesis Agentin kanssa, sinun on kirjoitettava seuraavat komennot päätteeseen, kun olet muodostanut yhteyden koneeseen:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Luodaan kansio API-vastausten tallentamista varten:

sudo mkdir /var/log/airline_tickets

Ennen agentin käynnistämistä sinun on määritettävä sen kokoonpano:

sudo vim /etc/aws-kinesis/agent.json

Agent.json-tiedoston sisällön pitäisi näyttää tältä:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kuten asetustiedostosta voidaan nähdä, agentti valvoo .log-tunnisteella varustettuja tiedostoja /var/log/airline_tickets/-hakemistossa, jäsentää ne ja siirtää ne lentolippujen tietovirtaan.

Käynnistämme palvelun uudelleen ja varmistamme, että se on toiminnassa:

sudo service aws-kinesis-agent restart

Lataa nyt Python-skripti, joka pyytää tietoja API:lta:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Api_caller.py-komentosarja pyytää tietoja Aviasalesilta ja tallentaa vastaanotetun vastauksen hakemistoon, jonka Kinesis-agentti tarkistaa. Tämän skriptin toteutus on melko tavallista, siellä on TicketsApi-luokka, jonka avulla voit vetää API asynkronisesti. Välitämme tunnuksella varustetun otsikon ja pyydämme parametreja tälle luokalle:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Testaa agentin oikeat asetukset ja toimivuus testaamalla api_caller.py-skripti:

sudo ./api_caller.py TOKEN

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Ja tarkastelemme työn tuloksia Agent-lokeissa ja Lentolippujen tietovirran Monitoring-välilehdellä:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Kuten näet, kaikki toimii ja Kinesis Agent lähettää tiedot onnistuneesti streamiin. Määritetään nyt kuluttaja.

Kinesis Data Analyticsin määrittäminen

Siirrytään koko järjestelmän keskeiseen osaan – luodaan Kinesis Data Analyticsiin uusi sovellus nimeltä kinesis_analytics_airlines_app:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Kinesis Data Analyticsin avulla voit suorittaa reaaliaikaista data-analytiikkaa Kinesis Streamsista SQL-kielellä. Se on täysin automaattinen skaalauspalvelu (toisin kuin Kinesis Streams), joka:

  1. voit luoda uusia virtoja (Output Stream) lähdetietojen pyyntöjen perusteella;
  2. tarjoaa virran, jossa on virheitä, jotka tapahtuivat sovellusten ollessa käynnissä (Error Stream);
  3. voi määrittää automaattisesti syötetietomallin (se voidaan määrittää manuaalisesti uudelleen tarvittaessa).

Tämä ei ole halpa palvelu - 0.11 USD per työtunti, joten sinun tulee käyttää sitä huolellisesti ja poistaa se, kun olet valmis.

Yhdistä sovellus tietolähteeseen:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Valitse stream, johon aiomme muodostaa yhteyden (airline_tickets):

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Seuraavaksi sinun on liitettävä uusi IAM-rooli, jotta sovellus voi lukea virrasta ja kirjoittaa streamiin. Tätä varten riittää, että et muuta mitään Käyttöoikeudet-lohkossa:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Pyydetään nyt tietokaavion löytämistä virrasta; napsauta "Discover schema" -painiketta. Tämän seurauksena IAM-rooli päivitetään (luodaan) ja skeeman tunnistus käynnistetään streamiin jo saapuneista tiedoista:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Nyt sinun on siirryttävä SQL-editoriin. Kun napsautat tätä painiketta, näkyviin tulee ikkuna, jossa sinua pyydetään käynnistämään sovellus - valitse mitä haluat käynnistää:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Lisää seuraava yksinkertainen kysely SQL-editori-ikkunaan ja napsauta Tallenna ja suorita SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Relaatiotietokannoissa työskentelet taulukoiden kanssa käyttämällä INSERT-käskyä tietueiden lisäämiseen ja SELECT-käskyä tietojen kyselyyn. Amazon Kinesis Data Analyticsissa työskentelet virtojen (STREAM:ien) ja pumppujen (PUMPien) kanssa – jatkuvat lisäyspyynnöt, jotka lisäävät tietoja sovelluksen yhdestä virrasta toiseen tietovirtaan.

Yllä esitetty SQL-kysely etsii Aeroflot-lippuja, joiden hinta on alle viisi tuhatta ruplaa. Kaikki nämä ehdot täyttävät tietueet sijoitetaan DESTINATION_SQL_STREAM-virtaan.

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Valitse Destination-lohkossa virta special_stream ja sovelluksen sisäisen virran nimi DESTINATION_SQL_STREAM -pudotusvalikosta:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Kaikkien manipulointien tuloksen pitäisi olla jotain alla olevan kuvan kaltaista:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

SNS-aiheen luominen ja tilaaminen

Mene yksinkertaiseen ilmoituspalveluun ja luo sinne uusi aihe nimeltä Lentoyhtiöt:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Tilaa tämä aihe ja ilmoita matkapuhelinnumero, johon tekstiviesti-ilmoitukset lähetetään:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

Luo taulukko DynamoDB:ssä

Luodaan DynamoDB:hen samanniminen taulukko, jotta voit tallentaa raakadatan lentolippuvirrasta. Ensisijaisena avaimena käytämme record_id:tä:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

Lambda-funktion kerääjän luominen

Luodaan lambda-funktio nimeltä Collector, jonka tehtävänä on pollata lentolippujen stream ja jos sieltä löytyy uusia tietueita, lisää ne DynamoDB-taulukkoon. Ilmeisesti oletusoikeuksien lisäksi tällä lambdalla on oltava lukuoikeus Kinesis-tietovirtaan ja kirjoitusoikeus DynamoDB:hen.

Luodaan IAM-rooli keräilijän lambda-funktiolle
Luodaan ensin uusi IAM-rooli lambdalle nimeltä Lambda-TicketsProcessingRole:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Testiesimerkissä esikonfiguroidut AmazonKinesisReadOnlyAccess- ja AmazonDynamoDBFullAccess-käytännöt ovat varsin sopivia, kuten alla olevasta kuvasta näkyy:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

Tämä lambda pitäisi käynnistää Kinesiksen laukaisulla, kun uusia merkintöjä tulee airline_streamiin, joten meidän on lisättävä uusi laukaisin:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Jää vain syöttää koodi ja tallentaa lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda-funktion ilmoituksen luominen

Toinen lambda-toiminto, joka valvoo toista streamia (special_stream) ja lähettää ilmoituksen SNS:lle, luodaan samalla tavalla. Siksi tällä lambdalla on oltava pääsy Kinesiksen lukemiseen ja viestien lähettämiseen tiettyyn SNS-aiheeseen, jotka sitten SNS-palvelu lähettää kaikille tämän aiheen tilaajille (sähköposti, tekstiviesti jne.).

IAM-roolin luominen
Ensin luodaan IAM-rooli Lambda-KinesisAlarm tälle lambdalle ja määritetään sitten tämä rooli luotavalle alarm_notifier lambdalle:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

Tämän lambdan pitäisi toimia laukaisimessa, jotta uudet tietueet pääsevät special_streamiin, joten sinun on määritettävä liipaisin samalla tavalla kuin teimme Collector lambdalle.

Tämän lambdan konfiguroinnin helpottamiseksi esitellään uusi ympäristömuuttuja - TOPIC_ARN, johon sijoitamme Airlines-aiheen ANR:n (Amazon Recourse Names):

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Ja lisää lambda-koodi, se ei ole ollenkaan monimutkaista:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Näyttää siltä, ​​että tässä on manuaalinen järjestelmän konfigurointi valmis. Jäljelle jää vain testata ja varmistaa, että olemme määrittäneet kaiken oikein.

Ota käyttöön Terraform-koodista

Vaadittu valmistelu

terraform on erittäin kätevä avoimen lähdekoodin työkalu infrastruktuurin käyttöönottoon koodista. Sillä on oma syntaksi, joka on helppo oppia, ja siinä on monia esimerkkejä siitä, miten ja mitä otetaan käyttöön. Atom-editorissa tai Visual Studio Codessa on monia käteviä laajennuksia, jotka helpottavat työskentelyä Terraformin kanssa.

Voit ladata jakelun siten. Yksityiskohtainen analyysi kaikista Terraformin ominaisuuksista ei kuulu tämän artikkelin piiriin, joten rajoitamme vain pääkohtiin.

Kuinka ajaa

Projektin koko koodi on arkistossani. Kloonamme arkiston itsellemme. Ennen kuin aloitat, sinun on varmistettava, että AWS CLI on asennettu ja määritetty, koska... Terraform etsii tunnistetietoja ~/.aws/credentials-tiedostosta.

Hyvä käytäntö on ajaa plan-komento ennen koko infrastruktuurin käyttöönottoa nähdäksesi, mitä Terraform tällä hetkellä luo meille pilvessä:

terraform.exe plan

Sinua pyydetään antamaan puhelinnumero, johon ilmoitukset lähetetään. Sitä ei tarvitse syöttää tässä vaiheessa.

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Analysoituamme ohjelman toimintasuunnitelman voimme aloittaa resurssien luomisen:

terraform.exe apply

Tämän komennon lähettämisen jälkeen sinua pyydetään jälleen syöttämään puhelinnumero; valitse "kyllä", kun näyttöön tulee kysymys toimintojen suorittamisesta. Tämän avulla voit määrittää koko infrastruktuurin, suorittaa kaikki tarvittavat EC2-asetukset, ottaa käyttöön lambda-toimintoja jne.

Kun kaikki resurssit on luotu onnistuneesti Terraform-koodin avulla, sinun on mentävä Kinesis Analytics -sovelluksen yksityiskohtiin (valitettavasti en löytänyt kuinka tämä tehdään suoraan koodista).

Käynnistä sovellus:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Tämän jälkeen sinun on määritettävä sovelluksen sisäisen virran nimi valitsemalla avattavasta luettelosta:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Nyt kaikki on valmiina lähtöön.

Sovelluksen testaus

Riippumatta siitä, miten otit järjestelmän käyttöön, manuaalisesti tai Terraform-koodin avulla, se toimii samoin.

Kirjaudumme sisään SSH:n kautta EC2-virtuaalikoneeseen, johon Kinesis Agent on asennettu, ja suoritamme api_caller.py-komentosarjan

sudo ./api_caller.py TOKEN

Sinun tarvitsee vain odottaa tekstiviestiä numeroosi:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
SMS - viesti saapuu puhelimeen melkein minuutissa:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus
Jää nähtäväksi, tallennettiinko tietueet DynamoDB-tietokantaan myöhempää yksityiskohtaisempaa analysointia varten. Lentoliput-taulukko sisältää suunnilleen seuraavat tiedot:

Aviasales API -integraatio Amazon Kinesiksen kanssa ja palvelinton yksinkertaisuus

Johtopäätös

Työn aikana rakennettiin Amazon Kinesikseen perustuva online-tietojenkäsittelyjärjestelmä. Vaihtoehtoja Kinesis Agentin käyttämiseksi Kinesis Data Streamsin ja reaaliaikaisen analytiikan Kinesis Analyticsin kanssa SQL-komentoja käyttäen sekä Amazon Kinesiksen vuorovaikutusta muiden AWS-palvelujen kanssa harkittiin.

Otimme yllä olevan järjestelmän käyttöön kahdella tavalla: melko pitkällä manuaalisella ja nopealla Terraform-koodista.

Kaikki projektin lähdekoodi on saatavilla GitHub-arkistossani, suosittelen, että tutustut siihen.

Keskustelen mielelläni artikkelista, odotan kommenttejasi. Toivon rakentavaa kritiikkiä.

Toivotan teille menestystä!

Lähde: will.com

Lisää kommentti