Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

Sveiki, Habr!

Ar jums patinka skraidyti lėktuvais? Man tai patinka, bet saviizoliacijos metu taip pat pamėgau analizuoti duomenis apie lėktuvų bilietus iš vieno žinomo šaltinio - Aviasales.

Šiandien analizuosime „Amazon Kinesis“ darbą, sukursime srautinio perdavimo sistemą su realaus laiko analize, kaip pagrindinę duomenų saugyklą įdiegsime „Amazon DynamoDB NoSQL“ duomenų bazę ir nustatysime SMS pranešimus apie įdomius bilietus.

Visos detalės yra po pjūviu! Pirmyn!

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

įvedimas

Pavyzdžiui, mums reikia prieigos prie Aviasales API. Prieiga prie jos teikiama nemokamai ir be apribojimų; jums tereikia užsiregistruoti skiltyje „Kūrėjai“, kad gautumėte API prieigos raktą, kad galėtumėte pasiekti duomenis.

Pagrindinis šio straipsnio tikslas – suteikti bendrą supratimą apie informacijos srautinio perdavimo naudojimą AWS; atsižvelgiame į tai, kad naudojamos API grąžinti duomenys nėra griežtai atnaujinami ir yra perduodami iš talpyklos, kuri yra sudaryta remiantis Aviasales.ru ir Jetradar.com svetainių naudotojų paieškomis per pastarąsias 48 valandas.

Kinesis agentas, įdiegtas gamybos mašinoje, gautas per API, automatiškai išanalizuoja ir perduoda duomenis į norimą srautą per Kinesis Data Analytics. Neapdorota šio srauto versija bus parašyta tiesiai į parduotuvę. „DynamoDB“ įdiegta neapdorotų duomenų saugykla leis atlikti gilesnę bilietų analizę naudojant BI įrankius, tokius kaip „AWS Quick Sight“.

Mes apsvarstysime dvi visos infrastruktūros diegimo galimybes:

  • Rankinis – per AWS valdymo pultą;
  • „Terraform“ kodo infrastruktūra skirta tingiems automatams;

Sukurtos sistemos architektūra

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Naudojami komponentai:

  • Aviasales API — šios API grąžinti duomenys bus naudojami visam tolesniam darbui;
  • EC2 gamintojo pavyzdys — įprasta virtualioji mašina debesyje, kurioje bus generuojamas įvesties duomenų srautas:
    • Kinezijos agentas yra vietoje įrenginyje įdiegta „Java“ programa, kuri suteikia paprastą būdą rinkti ir siųsti duomenis į „Kinesis“ („Kinesis Data Streams“ arba „Kinesis Firehose“). Agentas nuolat stebi failų rinkinį nurodytuose kataloguose ir siunčia naujus duomenis į Kinesis;
    • API skambintojo scenarijus — Python scenarijus, kuris pateikia užklausas API ir įdeda atsakymą į aplanką, kurį stebi Kinesis Agent;
  • Kinesis duomenų srautai — duomenų srautinio perdavimo realiuoju laiku paslauga su plačiomis mastelio keitimo galimybėmis;
  • Kinezinė analizė yra paslauga be serverio, kuri supaprastina srautinio perdavimo duomenų analizę realiuoju laiku. „Amazon Kinesis Data Analytics“ sukonfigūruoja programos išteklius ir automatiškai keičia mastelį, kad būtų galima apdoroti bet kokį gaunamų duomenų kiekį;
  • „AWS Lambda“ — paslauga, leidžianti paleisti kodą nekuriant atsarginės kopijos ir nenustatant serverių. Visa skaičiavimo galia automatiškai keičiama kiekvienam skambučiui;
  • „Amazon DynamoDB“ – Raktų ir reikšmių porų ir dokumentų duomenų bazė, kuri užtikrina mažesnę nei 10 milisekundžių delsą, kai ji veikia bet kokiu mastu. Kai naudojate DynamoDB, jums nereikia teikti, pataisyti ar valdyti jokių serverių. „DynamoDB“ automatiškai keičia lenteles, kad sureguliuotų turimų išteklių kiekį ir išlaikytų aukštą našumą. Nereikalingas sistemos administravimas;
  • „Amazon SNS“ - visiškai valdoma paslauga, skirta pranešimų siuntimui naudojant leidėjo ir abonento (Pub/Sub) modelį, su kuria galite atskirti mikropaslaugas, paskirstytas sistemas ir programas be serverių. SNS gali būti naudojamas siunčiant informaciją galutiniams vartotojams per mobiliuosius pranešimus, SMS žinutes ir el.

Pradinis mokymas

Norėdami imituoti duomenų srautą, nusprendžiau naudoti „Aviasales“ API grąžintą informaciją apie lėktuvo bilietus. IN dokumentacija gana platus įvairių metodų sąrašas, paimkime vieną iš jų – „Mėnesio kainų kalendorių“, kuriame pateikiamos kainos kiekvienai mėnesio dienai, sugrupuotos pagal pervedimų skaičių. Jei užklausoje nenurodysite paieškos mėnesio, informacija bus grąžinta už kitą mėnesį po einamojo.

Taigi, užsiregistruokime ir gaukime žetoną.

Toliau pateiktas prašymo pavyzdys:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Aukščiau pateiktas duomenų gavimo iš API metodas, užklausoje nurodant prieigos raktą, veiks, bet aš norėčiau perduoti prieigos raktą per antraštę, todėl šį metodą naudosime api_caller.py scenarijuje.

Atsakymo pavyzdys:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Aukščiau pateiktas pavyzdinis API atsakymas rodo bilietą iš Sankt Peterburgo į Puką... Oi, kokia svajonė...
Kadangi esu iš Kazanės, o Puketas dabar „tik svajonė“, tai paieškokime bilietų iš Sankt Peterburgo į Kazanę.

Daroma prielaida, kad jau turite AWS paskyrą. Iš karto noriu atkreipti ypatingą dėmesį į tai, kad Kinesis ir pranešimų siuntimas SMS žinutėmis neįeina į metinį Nemokama pakopa (nemokamas naudojimas). Tačiau net nepaisant to, turint omenyje porą dolerių, visiškai įmanoma sukurti siūlomą sistemą ir su ja žaisti. Ir, žinoma, nepamirškite ištrinti visų išteklių, kai jie nebereikalingi.

Laimei, „DynamoDb“ ir „Lambda“ funkcijos bus nemokamos, jei pasieksime nemokamų mėnesinių limitų. Pavyzdžiui, „DynamoDB“: 25 GB saugyklos, 25 WCU/RCU ir 100 milijonų užklausų. Ir milijonas lambda funkcijos skambučių per mėnesį.

Rankinis sistemos diegimas

Kinesis duomenų srautų nustatymas

Eikime į „Kinesis Data Streams“ paslaugą ir sukurkime du naujus srautus, kiekvienam po vieną fragmentą.

Kas yra skeveldra?
„Shard“ yra pagrindinis „Amazon Kinesis“ srauto duomenų perdavimo įrenginys. Vienas segmentas užtikrina įvesties duomenų perdavimą 1 MB/s greičiu ir išvesties duomenų perdavimą 2 MB/s greičiu. Vienas segmentas palaiko iki 1000 PUT įrašų per sekundę. Kuriant duomenų srautą reikia nurodyti reikiamą segmentų skaičių. Pavyzdžiui, galite sukurti duomenų srautą su dviem segmentais. Šis duomenų srautas užtikrins įvesties duomenų perdavimą 2 MB/s greičiu ir išvesties duomenų perdavimą 4 MB/s greičiu, palaikydamas iki 2000 PUT įrašų per sekundę.

Kuo daugiau šukių jūsų sraute, tuo didesnis jo pralaidumas. Iš principo srautai mastuojami taip – ​​pridedant šukes. Bet kuo daugiau šukių turite, tuo didesnė kaina. Kiekviena skeveldra kainuoja 1,5 cento už valandą ir papildomai 1.4 cento už kiekvieną milijoną PUT naudingosios apkrovos vienetų.

Sukurkime naują srautą su pavadinimu Lėktuvo bilietai, jam pakaks 1 šukės:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Dabar sukurkime kitą giją su pavadinimu special_stream:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

Gamintojo sąranka

Norint išanalizuoti užduotį, pakanka naudoti įprastą EC2 egzempliorių kaip duomenų gamintoją. Tai nebūtinai turi būti galinga, brangi virtuali mašina; tiks ir taškas t2.micro.

Svarbi pastaba: pavyzdžiui, turėtumėte naudoti vaizdą - Amazon Linux AMI 2018.03.0, jame yra mažiau nustatymų, kad greitai paleistumėte Kinesis Agent.

Eikite į EC2 paslaugą, sukurkite naują virtualią mašiną, pasirinkite norimą AMI su tipu t2.micro, kuris yra įtrauktas į nemokamą pakopą:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Kad naujai sukurta virtuali mašina galėtų bendrauti su Kinesis paslauga, jai turi būti suteiktos tam teisės. Geriausias būdas tai padaryti yra priskirti IAM vaidmenį. Todėl ekrane 3 veiksmas: konfigūruoti išsamią egzemplioriaus informaciją turėtumėte pasirinkti Sukurkite naują IAM vaidmenį:

EC2 IAM vaidmens sukūrimas
Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Atsidariusiame lange pasirinkite, kad kuriame naują EC2 vaidmenį, ir eikite į skyrių Leidimai:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Naudodamiesi mokymo pavyzdžiu, mums nereikia gilintis į visas detalios išteklių teisių konfigūracijos subtilybes, todėl pasirinksime „Amazon“ iš anksto sukonfigūruotas strategijas: AmazonKinesisFullAccess ir CloudWatchFullAccess.

Suteikime prasmingą šio vaidmens pavadinimą, pavyzdžiui: EC2-KinesisStreams-FullAccess. Rezultatas turėtų būti toks pat, kaip parodyta paveikslėlyje žemiau:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Sukūrę šį naują vaidmenį, nepamirškite jo pridėti prie sukurtos virtualios mašinos egzemplioriaus:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Šiame ekrane daugiau nieko nekeičiame ir pereiname prie kitų langų.

Kietojo disko nustatymus galima palikti kaip numatytuosius, taip pat žymes (nors gera praktika naudoti žymas, bent jau suteikti egzemplioriui pavadinimą ir nurodyti aplinką).

Dabar esame skirtuke 6 veiksmas: konfigūruoti saugos grupę, kur reikia sukurti naują arba nurodyti esamą saugos grupę, kuri leidžia prisijungti per ssh (22 prievadą) prie egzemplioriaus. Ten pasirinkite Šaltinis -> Mano IP ir galite paleisti egzempliorių.

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Kai tik jis persijungia į veikimo būseną, galite pabandyti prisijungti prie jo per ssh.

Kad galėtumėte dirbti su Kinesis Agent, sėkmingai prisijungę prie aparato, terminale turite įvesti šias komandas:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Sukurkime aplanką API atsakymams išsaugoti:

sudo mkdir /var/log/airline_tickets

Prieš paleisdami agentą, turite sukonfigūruoti jo konfigūraciją:

sudo vim /etc/aws-kinesis/agent.json

Failo agent.json turinys turėtų atrodyti taip:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kaip matyti iš konfigūracijos failo, agentas stebės failus su plėtiniu .log kataloge /var/log/airline_tickets/, juos analizuos ir perkels į airline_tickets srautą.

Iš naujo paleidžiame paslaugą ir įsitikiname, kad ji veikia:

sudo service aws-kinesis-agent restart

Dabar atsisiųskite Python scenarijų, kuris paprašys duomenų iš API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Api_caller.py scenarijus prašo duomenų iš Aviasales ir išsaugo gautą atsakymą kataloge, kurį nuskaito Kinesis agentas. Šio scenarijaus įgyvendinimas gana standartinis, yra TicketsApi klasė, leidžianti asinchroniškai traukti API. Šiai klasei perduodame antraštę su prieigos raktu ir prašome parametrų:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Norėdami patikrinti teisingus agento nustatymus ir funkcionalumą, paleiskite api_caller.py scenarijų:

sudo ./api_caller.py TOKEN

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Ir mes žiūrime į darbo rezultatus agento žurnaluose ir skirtuke Stebėjimas duomenų sraute airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Kaip matote, viskas veikia ir Kinesis Agent sėkmingai siunčia duomenis į srautą. Dabar sukonfigūruokime vartotoją.

Kinesis duomenų analizės nustatymas

Pereikime prie centrinio visos sistemos komponento – sukurkite naują programą Kinesis Data Analytics pavadinimu kinesis_analytics_airlines_app:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
„Kinesis Data Analytics“ leidžia atlikti „Kinesis Streams“ duomenų analizę realiuoju laiku naudojant SQL kalbą. Tai visiškai automatinio mastelio keitimo paslauga (skirtingai nei Kinesis Streams), kuri:

  1. leidžia kurti naujus srautus (Output Stream) pagal duomenų šaltinio užklausas;
  2. teikia srautą su klaidomis, kurios įvyko veikiant programoms (klaidų srautas);
  3. gali automatiškai nustatyti įvesties duomenų schemą (jei reikia, ją galima iš naujo apibrėžti rankiniu būdu).

Tai nėra pigi paslauga – 0.11 USD už valandą darbo, todėl ja naudotis reikia atsargiai ir baigus ištrinti.

Prijunkite programą prie duomenų šaltinio:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Pasirinkite srautą, prie kurio prisijungsime (airline_tickets):

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Tada turite pridėti naują IAM vaidmenį, kad programa galėtų skaityti iš srauto ir rašyti į srautą. Norėdami tai padaryti, pakanka nieko nekeisti prieigos leidimų bloke:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Dabar paprašykime aptikti duomenų schemą sraute; norėdami tai padaryti, spustelėkite mygtuką „Aptikti schemą“. Dėl to IAM vaidmuo bus atnaujintas (bus sukurtas naujas) ir bus paleistas schemos aptikimas iš duomenų, kurie jau pasiekė srautą:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Dabar reikia eiti į SQL redaktorių. Spustelėjus šį mygtuką, pasirodys langas, kuriame bus prašoma paleisti programą – pasirinkite, ką norite paleisti:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Įdėkite šią paprastą užklausą į SQL redaktoriaus langą ir spustelėkite Įrašyti ir paleisti SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Reliacinėse duomenų bazėse dirbate su lentelėmis naudodami INSERT sakinius, kad pridėtumėte įrašus, ir SELECT sakinį, kad pateiktumėte duomenų užklausą. Naudodami „Amazon Kinesis Data Analytics“ dirbate su srautais (STREAM) ir siurbliais (PUMP) – nuolatinėmis įterpimo užklausomis, kurios įterpia duomenis iš vieno programos srauto į kitą srautą.

Aukščiau pateikta SQL užklausa ieško „Aeroflot“ bilietų, kurių kaina mažesnė nei penki tūkstančiai rublių. Visi šias sąlygas atitinkantys įrašai bus patalpinti į DESTINATION_SQL_STREAM srautą.

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Bloke „Paskirties vieta“ pasirinkite srautą special_stream, o išplečiamajame sąraše srauto pavadinimas programoje DESTINATION_SQL_STREAM:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Visų manipuliacijų rezultatas turėtų būti panašus į toliau pateiktą paveikslėlį:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

SNS temos kūrimas ir prenumerata

Eikite į paprastą pranešimų tarnybą ir sukurkite naują temą pavadinimu „Avialinijos“:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Prenumeruokite šią temą ir nurodykite mobiliojo telefono numerį, kuriuo bus siunčiami SMS pranešimai:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

Sukurkite lentelę „DynamoDB“.

Norėdami išsaugoti neapdorotus duomenis iš jų airline_tickets srauto, sukurkime DynamoDB lentelę tuo pačiu pavadinimu. Kaip pirminį raktą naudosime record_id:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

Lambda funkcijų rinktuvo sukūrimas

Sukurkime lambda funkciją Collector, kurios užduotis bus apklausti airline_tickets srautą ir, jei ten bus rasta naujų įrašų, įterpti šiuos įrašus į DynamoDB lentelę. Akivaizdu, kad, be numatytųjų teisių, ši lambda turi turėti skaitymo prieigą prie Kinesis duomenų srauto ir rašymo prieigą prie DynamoDB.

IAM vaidmens sukūrimas kolektoriaus lambda funkcijai
Pirmiausia sukurkime naują lambda IAM vaidmenį, pavadintą Lambda-TicketsProcessingRole:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Bandomajam pavyzdžiui iš anksto sukonfigūruotos „AmazonKinesisReadOnlyAccess“ ir „AmazonDynamoDBFullAccess“ strategijos yra gana tinkamos, kaip parodyta paveikslėlyje žemiau:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

Ši lambda turėtų būti paleista naudojant Kinesis paleidiklį, kai nauji įrašai patenka į airline_stream, todėl turime pridėti naują trigerį:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Belieka įvesti kodą ir išsaugoti lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda funkcijos pranešėjo sukūrimas

Panašiai kuriama ir antroji lambda funkcija, kuri stebės antrąjį srautą (special_stream) ir siųs pranešimą SNS. Todėl ši lambda turi turėti prieigą skaityti iš Kinesis ir siųsti žinutes į nurodytą SNS temą, kurią vėliau SNS tarnyba išsiųs visiems šios temos prenumeratoriams (el. paštu, SMS ir pan.).

IAM vaidmens sukūrimas
Pirmiausia sukuriame šios lambda IAM vaidmenį Lambda-KinesisAlarm, o tada priskiriame šį vaidmenį kuriamai alarm_notifier lambda:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

Ši lambda turėtų veikti su trigeriu, kad nauji įrašai patektų į special_stream, todėl turite sukonfigūruoti trigerį taip pat, kaip tai padarėme su Collector lambda.

Kad būtų lengviau konfigūruoti šią lambda, pristatykime naują aplinkos kintamąjį - TOPIC_ARN, kuriame pateikiame oro linijų temos ANR (Amazon Recourse Names):

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Ir įdėkite lambda kodą, tai visai nesudėtinga:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Atrodo, kad čia baigtas rankinis sistemos konfigūravimas. Belieka tik išbandyti ir įsitikinti, kad viską sukonfigūravome teisingai.

Diegti iš Terraform kodo

Būtinas pasiruošimas

Terraformas yra labai patogus atvirojo kodo įrankis diegti infrastruktūrą iš kodo. Ji turi savo sintaksę, kurią lengva išmokti, ir turi daug pavyzdžių, kaip ir ką įdiegti. „Atom“ redaktorius arba „Visual Studio Code“ turi daug patogių įskiepių, palengvinančių darbą su „Terraform“.

Galite atsisiųsti platinimą taigi. Išsami visų Terraform galimybių analizė nepatenka į šio straipsnio taikymo sritį, todėl apsiribosime pagrindiniais dalykais.

Kaip pradėti

Visas projekto kodas yra mano saugykloje. Mes klonuojame saugyklą sau. Prieš pradėdami, turite įsitikinti, kad AWS CLI yra įdiegtas ir sukonfigūruotas, nes... „Terraform“ kredencialų ieškos faile ~/.aws/credentials.

Gera praktika yra paleisti plano komandą prieš diegiant visą infrastruktūrą, kad pamatytumėte, ką Terraform šiuo metu mums kuria debesyje:

terraform.exe plan

Būsite paraginti įvesti telefono numerį, kuriuo norite siųsti pranešimus. Šiame etape jo įvesti nebūtina.

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Išanalizavę programos veikimo planą, galime pradėti kurti išteklius:

terraform.exe apply

Išsiuntus šią komandą, jūsų vėl bus paprašyta įvesti telefono numerį; surinkite „taip“, kai pasirodys klausimas apie faktinį veiksmų atlikimą. Tai leis jums susikurti visą infrastruktūrą, atlikti visą reikiamą EC2 konfigūraciją, diegti lambda funkcijas ir kt.

Po to, kai visi ištekliai buvo sėkmingai sukurti naudojant Terraform kodą, turite įsigilinti į Kinesis Analytics programos detales (deja, neradau, kaip tai padaryti tiesiai iš kodo).

Paleiskite programą:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Po to turite aiškiai nustatyti srauto pavadinimą programoje, pasirinkdami išskleidžiamajame sąraše:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Dabar viskas paruošta eiti.

Programos testavimas

Nepriklausomai nuo to, kaip įdiegėte sistemą rankiniu būdu ar naudodami Terraform kodą, ji veiks taip pat.

Prisijungiame per SSH prie virtualios mašinos EC2, kurioje įdiegta Kinesis Agent, ir paleidžiame api_caller.py scenarijų

sudo ./api_caller.py TOKEN

Viskas, ką jums reikia padaryti, tai laukti SMS žinutės jūsų numeriu:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
SMS - žinutė į telefoną ateina beveik per 1 minutę:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio
Belieka išsiaiškinti, ar įrašai buvo išsaugoti „DynamoDB“ duomenų bazėje, kad būtų galima atlikti išsamesnę analizę. Lėktuvų bilietų lentelėje yra maždaug šie duomenys:

Aviasales API integracija su Amazon Kinesis ir paprastumas be serverio

išvada

Atliekant darbą buvo sukurta internetinė duomenų apdorojimo sistema Amazon Kinesis pagrindu. Apsvarstytos galimybės naudoti Kinesis Agent kartu su Kinesis duomenų srautais ir realaus laiko analize Kinesis Analytics naudojant SQL komandas, taip pat Amazon Kinesis sąveika su kitomis AWS paslaugomis.

Aukščiau pateiktą sistemą įdiegėme dviem būdais: gana ilgą rankinį ir greitą iš Terraform kodo.

Galimas visas projekto šaltinio kodas mano „GitHub“ saugykloje, siūlau su ja susipažinti.

Džiaugiuosi galėdamas aptarti straipsnį, laukiu jūsų komentarų. Tikiuosi konstruktyvios kritikos.

Linkiu sėkmės!

Šaltinis: www.habr.com

Добавить комментарий