Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Hej Habr!

A ju pëlqejnë aeroplanët fluturues? Më pëlqen, por gjatë izolimit u dashurova gjithashtu me analizimin e të dhënave për biletat ajrore nga një burim i mirënjohur - Aviasales.

Sot do të analizojmë punën e Amazon Kinesis, do të ndërtojmë një sistem transmetimi me analitikë në kohë reale, do të instalojmë bazën e të dhënave Amazon DynamoDB NoSQL si ruajtjen kryesore të të dhënave dhe do të vendosim njoftime SMS për bileta interesante.

Të gjitha detajet janë nën prerje! Shkoni!

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Paraqitje

Për shembull, ne kemi nevojë për qasje në Aviasales API. Qasja në të ofrohet pa pagesë dhe pa kufizime; thjesht duhet të regjistroheni në seksionin "Zhvilluesit" për të marrë tokenin tuaj API për të hyrë në të dhënat.

Qëllimi kryesor i këtij artikulli është të japë një kuptim të përgjithshëm të përdorimit të transmetimit të informacionit në AWS; ne marrim parasysh që të dhënat e kthyera nga API-ja e përdorur nuk janë rreptësisht të përditësuara dhe transmetohen nga cache, e cila është formuar në bazë të kërkimeve nga përdoruesit e faqeve Aviasales.ru dhe Jetradar.com për 48 orët e fundit.

Kinesis-agent, i instaluar në makinën prodhuese, i marrë nëpërmjet API do të analizojë dhe transmetojë automatikisht të dhënat në rrjedhën e dëshiruar nëpërmjet Kinesis Data Analytics. Versioni i papërpunuar i këtij transmetimi do të shkruhet direkt në dyqan. Ruajtja e të dhënave të papërpunuara të vendosura në DynamoDB do të lejojë analiza më të thella të biletave përmes mjeteve të BI, siç është AWS Quick Sight.

Ne do të shqyrtojmë dy opsione për vendosjen e të gjithë infrastrukturës:

  • Manual - nëpërmjet AWS Management Console;
  • Infrastruktura nga kodi Terraform është për automatët dembelë;

Arkitektura e sistemit të zhvilluar

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Komponentët e përdorur:

  • Aviasales API — të dhënat e kthyera nga kjo API do të përdoren për të gjithë punën e mëvonshme;
  • Shembulli i Prodhuesit EC2 — një makinë virtuale e rregullt në cloud në të cilën do të gjenerohet rrjedha e të dhënave hyrëse:
    • Agjent Kinesis është një aplikacion Java i instaluar lokalisht në makinë që ofron një mënyrë të thjeshtë për të mbledhur dhe dërguar të dhëna te Kinesis (Kinesis Data Streams ose Kinesis Firehose). Agjenti monitoron vazhdimisht një grup skedarësh në drejtoritë e specifikuara dhe dërgon të dhëna të reja në Kinesis;
    • Skript i thirrësit API — Një skript Python që bën kërkesa në API dhe e vendos përgjigjen në një dosje që monitorohet nga Kinesis Agent;
  • Rrjedhat e të dhënave Kinesis — shërbim i transmetimit të të dhënave në kohë reale me aftësi të gjerë shkallëzimi;
  • Kinesis Analytics është një shërbim pa server që thjeshton analizën e të dhënave të transmetimit në kohë reale. Amazon Kinesis Data Analytics konfiguron burimet e aplikacionit dhe shkallëzohet automatikisht për të trajtuar çdo vëllim të të dhënave hyrëse;
  • AWS Lambda — një shërbim që ju lejon të ekzekutoni kodin pa rezervuar ose konfiguruar serverë. E gjithë fuqia kompjuterike shkallëzohet automatikisht për çdo telefonatë;
  • Amazon DynamoDB - Një bazë të dhënash me çifte dhe dokumente me vlerë kyçe që siguron vonesë prej më pak se 10 milisekonda kur funksionon në çdo shkallë. Kur përdorni DynamoDB, nuk keni nevojë të siguroni, korrigjoni ose menaxhoni ndonjë server. DynamoDB shkallëzon automatikisht tabelat për të rregulluar sasinë e burimeve të disponueshme dhe për të ruajtur performancën e lartë. Nuk kërkohet administrim i sistemit;
  • Amazon SNS - një shërbim plotësisht i menaxhuar për dërgimin e mesazheve duke përdorur modelin botues-pajtimtar (Pub/Sub), me të cilin mund të izoloni mikroshërbimet, sistemet e shpërndara dhe aplikacionet pa server. SNS mund të përdoret për të dërguar informacione tek përdoruesit fundorë përmes njoftimeve të shtytjes celulare, mesazheve SMS dhe emaileve.

Trajnimi fillestar

Për të imituar rrjedhën e të dhënave, vendosa të përdor informacionin e biletave të linjës ajrore të kthyer nga API Aviasales. NË dokumentacionin një listë mjaft e gjerë e metodave të ndryshme, le të marrim njërën prej tyre - "Kalendari i Çmimeve Mujore", i cili kthen çmimet për çdo ditë të muajit, të grupuara sipas numrit të transfertave. Nëse nuk specifikoni muajin e kërkimit në kërkesë, informacioni do të kthehet për muajin pasues të atij aktual.

Pra, le të regjistrohemi dhe të marrim shenjën tonë.

Një shembull i kërkesës është më poshtë:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Metoda e mësipërme e marrjes së të dhënave nga API duke specifikuar një shenjë në kërkesë do të funksionojë, por unë preferoj ta kaloj tokenin e aksesit përmes kokës, kështu që ne do ta përdorim këtë metodë në skriptin api_caller.py.

Shembulli i përgjigjes:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Shembulli i përgjigjes API më lart tregon një biletë nga Shën Petersburg në Phuk... Oh, çfarë ëndrre...
Meqenëse unë jam nga Kazani, dhe Phuket tani është "vetëm një ëndërr", le të kërkojmë bileta nga Shën Petersburg në Kazan.

Supozon që ju tashmë keni një llogari AWS. Dëshiroj të tërheq menjëherë vëmendje të veçantë për faktin se Kinesis dhe dërgimi i njoftimeve me SMS nuk përfshihen në vjetorin Niveli i lirë (përdorim falas). Por edhe përkundër kësaj, me disa dollarë në mendje, është mjaft e mundur të ndërtohet sistemi i propozuar dhe të luhet me të. Dhe, sigurisht, mos harroni të fshini të gjitha burimet pasi ato nuk janë më të nevojshme.

Për fat të mirë, funksionet DynamoDb dhe lambda do të jenë falas për ne nëse përmbushim limitet tona mujore falas. Për shembull, për DynamoDB: 25 GB hapësirë ​​ruajtëse, 25 WCU/RCU dhe 100 milionë pyetje. Dhe një milion telefonata me funksion lambda në muaj.

Vendosja manuale e sistemit

Vendosja e rrjedhave të të dhënave Kinesis

Le të shkojmë te shërbimi Kinesis Data Streams dhe të krijojmë dy transmetime të reja, një copë për secilën.

Çfarë është një copëz?
Një copëz është njësia bazë e transferimit të të dhënave të një rryme Amazon Kinesis. Një segment siguron transferimin e të dhënave hyrëse me një shpejtësi prej 1 MB/s dhe transferimin e të dhënave dalëse me një shpejtësi prej 2 MB/s. Një segment mbështet deri në 1000 hyrje PUT në sekondë. Kur krijoni një rrjedhë të dhënash, duhet të specifikoni numrin e kërkuar të segmenteve. Për shembull, mund të krijoni një rrjedhë të dhënash me dy segmente. Ky transmetim i të dhënave do të sigurojë transferimin e të dhënave hyrëse me 2 MB/s dhe transferimin e të dhënave dalëse me 4 MB/s, duke mbështetur deri në 2000 regjistrime PUT për sekondë.

Sa më shumë copëza në transmetimin tuaj, aq më i madh është xhiroja e tij. Në parim, kështu shkallëzohen rrjedhat - duke shtuar copëza. Por sa më shumë copëza të keni, aq më i lartë është çmimi. Çdo copëz kushton 1,5 cent në orë dhe 1.4 cent shtesë për çdo milion njësi ngarkese PUT.

Le të krijojmë një transmetim të ri me emrin biletat e linjës ajrore, 1 copë do të jetë e mjaftueshme për të:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Tani le të krijojmë një fije tjetër me emrin transmetim_e veçantë:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Konfigurimi i prodhuesit

Për të analizuar një detyrë, mjafton të përdorni një shembull të rregullt EC2 si prodhues të të dhënave. Nuk duhet të jetë një makinë virtuale e fuqishme dhe e shtrenjtë; një t2.micro spot do të funksionojë mirë.

Shënim i rëndësishëm: për shembull, duhet të përdorni imazhin - Amazon Linux AMI 2018.03.0, ai ka më pak cilësime për lëshimin e shpejtë të Agjentit Kinesis.

Shkoni te shërbimi EC2, krijoni një makinë të re virtuale, zgjidhni AMI-në e dëshiruar me llojin t2.micro, e cila përfshihet në Nivelin Falas:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Në mënyrë që makina virtuale e krijuar rishtazi të jetë në gjendje të ndërveprojë me shërbimin Kinesis, duhet t'i jepen të drejta për ta bërë këtë. Mënyra më e mirë për ta bërë këtë është të caktoni një Rol IAM. Prandaj, në ekranin e Hapit 3: Konfiguro detajet e shembullit, duhet të zgjidhni Krijo një rol të ri IAM:

Krijimi i një roli IAM për EC2
Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Në dritaren që hapet, zgjidhni që po krijojmë një rol të ri për EC2 dhe shkoni te seksioni Permissions:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Duke përdorur shembullin e trajnimit, nuk kemi pse të hyjmë në të gjitha ndërlikimet e konfigurimit të grimcuar të të drejtave të burimeve, kështu që ne do të zgjedhim politikat e para-konfiguruara nga Amazon: AmazonKinesisFullAccess dhe CloudWatchFullAccess.

Le të japim një emër kuptimplotë për këtë rol, për shembull: EC2-KinesisStreams-FullAccess. Rezultati duhet të jetë i njëjtë siç tregohet në foton më poshtë:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Pas krijimit të këtij roli të ri, mos harroni ta bashkëngjitni atë në shembullin e krijuar të makinës virtuale:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Ne nuk ndryshojmë asgjë tjetër në këtë ekran dhe kalojmë në dritaret e ardhshme.

Cilësimet e diskut mund të lihen si parazgjedhje, si dhe etiketat (megjithëse është praktikë e mirë të përdorni etiketa, të paktën t'i jepni një emër shembullit dhe të tregoni mjedisin).

Tani jemi në hapin 6: Konfiguro skedën e Grupit të Sigurisë, ku duhet të krijoni një të re ose të specifikoni grupin tuaj ekzistues të Sigurisë, i cili ju lejon të lidheni nëpërmjet ssh (porti 22) me shembullin. Zgjidhni Burimi -> IP-ja ime atje dhe mund të nisni shembullin.

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Sapo kalon në statusin e ekzekutimit, mund të provoni të lidheni me të përmes ssh.

Për të qenë në gjendje të punoni me Kinesis Agent, pas lidhjes me sukses me makinën, duhet të futni komandat e mëposhtme në terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Le të krijojmë një dosje për të ruajtur përgjigjet e API:

sudo mkdir /var/log/airline_tickets

Para fillimit të agjentit, duhet të konfiguroni konfigurimin e tij:

sudo vim /etc/aws-kinesis/agent.json

Përmbajtja e skedarit agent.json duhet të duket kështu:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Siç mund të shihet nga skedari i konfigurimit, agjenti do të monitorojë skedarët me ekstensionin .log në drejtorinë /var/log/airline_tickets/, do t'i analizojë ato dhe do t'i transferojë në rrjedhën e airline_tickets.

Ne rinisim shërbimin dhe sigurojmë që ai të funksionojë:

sudo service aws-kinesis-agent restart

Tani le të shkarkojmë skriptin Python që do të kërkojë të dhëna nga API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skripti api_caller.py kërkon të dhëna nga Aviasales dhe ruan përgjigjen e marrë në drejtorinë që skanon agjenti Kinesis. Zbatimi i këtij skripti është mjaft standard, ekziston një klasë TicketsApi, ju lejon të tërheqni në mënyrë asinkrone API-në. Ne kalojmë një kokë me një shenjë dhe kërkojmë parametra në këtë klasë:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Për të testuar cilësimet dhe funksionalitetin e saktë të agjentit, le të testojmë skriptin api_caller.py:

sudo ./api_caller.py TOKEN

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Dhe ne shikojmë rezultatin e punës në regjistrat e agjentëve dhe në skedën Monitorimi në rrjedhën e të dhënave të biletave ajrore:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Siç mund ta shihni, gjithçka funksionon dhe Agjenti Kinesis dërgon me sukses të dhënat në transmetim. Tani le të konfigurojmë konsumatorin.

Konfigurimi i Kinesis Data Analytics

Le të kalojmë te komponenti qendror i të gjithë sistemit - krijoni një aplikacion të ri në Kinesis Data Analytics të quajtur kinesis_analytics_airlines_app:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Kinesis Data Analytics ju lejon të kryeni analiza të të dhënave në kohë reale nga Kinesis Streams duke përdorur gjuhën SQL. Është një shërbim plotësisht automatik (ndryshe nga Kinesis Streams) që:

  1. ju lejon të krijoni transmetime të reja (Output Stream) bazuar në kërkesat për burimin e të dhënave;
  2. ofron një transmetim me gabime që kanë ndodhur gjatë ekzekutimit të aplikacioneve (Error Stream);
  3. mund të përcaktojë automatikisht skemën e të dhënave hyrëse (mund të ripërcaktohet manualisht nëse është e nevojshme).

Ky nuk është një shërbim i lirë - 0.11 USD për orë pune, ndaj duhet ta përdorni me kujdes dhe ta fshini kur të keni mbaruar.

Le ta lidhim aplikacionin me burimin e të dhënave:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Zgjidhni transmetimin me të cilin do të lidhemi (airline_biletat):

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Më pas, duhet të bashkëngjitni një Rol të ri IAM në mënyrë që aplikacioni të lexojë nga transmetimi dhe të shkruajë në transmetim. Për ta bërë këtë, mjafton të mos ndryshoni asgjë në bllokun e lejeve të aksesit:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Tani le të kërkojmë zbulimin e skemës së të dhënave në transmetim; për ta bërë këtë, klikoni në butonin "Zbulo skemën". Si rezultat, roli IAM do të përditësohet (do të krijohet një i ri) dhe zbulimi i skemës do të nisë nga të dhënat që kanë mbërritur tashmë në transmetim:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Tani duhet të shkoni te redaktori SQL. Kur klikoni në këtë buton, do të shfaqet një dritare që ju kërkon të hapni aplikacionin - zgjidhni atë që dëshironi të hapni:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Futni pyetjen e mëposhtme të thjeshtë në dritaren e redaktuesit SQL dhe klikoni Ruaj dhe Ekzekuto SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Në bazat e të dhënave relacionale, ju punoni me tabela duke përdorur deklaratat INSERT për të shtuar regjistrime dhe një deklaratë SELECT për të kërkuar të dhënat. Në Amazon Kinesis Data Analytics, ju punoni me rryma (STREAM) dhe pompa (PUMP)—kërkesa të vazhdueshme të futjes që futin të dhëna nga një transmetim në një aplikacion në një transmetim tjetër.

Pyetja SQL e paraqitur më sipër kërkon për biletat e Aeroflot me një kosto nën pesë mijë rubla. Të gjitha regjistrimet që plotësojnë këto kushte do të vendosen në transmetimin DESTINATION_SQL_STREAM.

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Në bllokun Destinacion, zgjidhni transmetimin special_stream dhe në listën rënëse të emrit të transmetimit brenda aplikacionit DESTINATION_SQL_STREAM:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Rezultati i të gjitha manipulimeve duhet të jetë diçka e ngjashme me foton më poshtë:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Krijimi dhe abonimi në një temë SNS

Shkoni te Shërbimi i Njoftimit të Thjeshtë dhe krijoni një temë të re atje me emrin Airlines:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Abonohuni në këtë temë dhe tregoni numrin e telefonit celular në të cilin do të dërgohen njoftimet SMS:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Krijoni një tabelë në DynamoDB

Për të ruajtur të dhënat e papërpunuara nga rryma e tyre airline_tickets, le të krijojmë një tabelë në DynamoDB me të njëjtin emër. Ne do të përdorim record_id si çelësin kryesor:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Krijimi i një kolektori të funksionit lambda

Le të krijojmë një funksion lambda të quajtur Kolektor, detyra e të cilit do të jetë të anketojë rrjedhën e biletave ajrore dhe, nëse gjenden rekorde të reja, t'i fusim këto të dhëna në tabelën DynamoDB. Natyrisht, përveç të drejtave të paracaktuara, kjo lambda duhet të ketë qasje leximi në rrjedhën e të dhënave Kinesis dhe qasje shkrimi në DynamoDB.

Krijimi i një roli IAM për funksionin lambda të kolektorit
Së pari, le të krijojmë një rol të ri IAM për lambda të quajtur Lambda-TicketsProcessingRole:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Për shembullin e provës, politikat e para-konfiguruara të AmazonKinesisReadOnlyAccess dhe AmazonDynamoDBFullAccess janë mjaft të përshtatshme, siç tregohet në foton më poshtë:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Kjo lambda duhet të lëshohet nga një shkas nga Kinesis kur hyrjet e reja hyjnë në airline_stream, kështu që ne duhet të shtojmë një shkas të ri:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Gjithçka që mbetet është të futni kodin dhe të ruani lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Krijimi i një njoftimi të funksionit lambda

Funksioni i dytë lambda, i cili do të monitorojë transmetimin e dytë (special_stream) dhe do të dërgojë një njoftim në SNS, është krijuar në mënyrë të ngjashme. Prandaj, kjo lambda duhet të ketë akses për të lexuar nga Kinesis dhe për të dërguar mesazhe në një temë të caktuar SNS, të cilat më pas do t'u dërgohen nga shërbimi SNS të gjithë abonentëve të kësaj teme (email, SMS, etj.).

Krijimi i një roli IAM
Së pari, ne krijojmë rolin IAM Lambda-KinesisAlarm për këtë lambda dhe më pas ia caktojmë këtë rol alarmit_notifier lambda që po krijohet:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Kjo lambda duhet të funksionojë në një shkas që regjistrimet e reja të hyjnë në special_stream, kështu që ju duhet të konfiguroni këmbëzën në të njëjtën mënyrë si ne për Lambda e Kolektorit.

Për ta bërë më të lehtë konfigurimin e kësaj lambda, le të prezantojmë një variabël të ri mjedisor - TOPIC_ARN, ku vendosim ANR (Emrat e burimeve të Amazonës) të temës së Linjave ajrore:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Dhe futni kodin lambda, nuk është aspak e komplikuar:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Duket se këtu përfundon konfigurimi manual i sistemit. E tëra që mbetet është të testojmë dhe të sigurohemi që kemi konfiguruar gjithçka në mënyrë korrekte.

Vendoseni nga kodi Terraform

Përgatitja e nevojshme

Terraform është një mjet shumë i përshtatshëm me burim të hapur për vendosjen e infrastrukturës nga kodi. Ajo ka sintaksën e vet që është e lehtë për t'u mësuar dhe ka shumë shembuj se si dhe çfarë të vendoset. Redaktori Atom ose Visual Studio Code ka shumë shtojca të dobishme që e bëjnë më të lehtë punën me Terraform.

Ju mund të shkarkoni shpërndarjen prandaj. Një analizë e detajuar e të gjitha aftësive të Terraform është përtej qëllimit të këtij artikulli, kështu që ne do të kufizohemi në pikat kryesore.

Si të vraponi

Kodi i plotë i projektit është në depon time. Ne e klonojmë depon për veten tonë. Përpara se të filloni, duhet të siguroheni që keni instaluar dhe konfiguruar AWS CLI, sepse... Terraform do të kërkojë kredencialet në skedarin ~/.aws/credentials.

Një praktikë e mirë është të ekzekutoni komandën e planit përpara se të vendosni të gjithë infrastrukturën për të parë se çfarë Terraform po krijon aktualisht për ne në cloud:

terraform.exe plan

Do t'ju kërkohet të vendosni një numër telefoni për të dërguar njoftime. Nuk është e nevojshme ta futni atë në këtë fazë.

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Pasi të kemi analizuar planin e funksionimit të programit, mund të fillojmë të krijojmë burime:

terraform.exe apply

Pas dërgimit të kësaj komande, do t'ju kërkohet përsëri të vendosni një numër telefoni; thirrni "po" kur shfaqet një pyetje në lidhje me kryerjen e vërtetë të veprimeve. Kjo do t'ju lejojë të konfiguroni të gjithë infrastrukturën, të kryeni të gjithë konfigurimin e nevojshëm të EC2, të vendosni funksionet lambda, etj.

Pasi të gjitha burimet të jenë krijuar me sukses përmes kodit Terraform, duhet të futeni në detajet e aplikacionit Kinesis Analytics (për fat të keq, nuk gjeta se si ta bëj këtë direkt nga kodi).

Nisni aplikacionin:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Pas kësaj, duhet të vendosni në mënyrë eksplicite emrin e transmetimit brenda aplikacionit duke zgjedhur nga lista rënëse:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Tani gjithçka është gati për të shkuar.

Testimi i aplikacionit

Pavarësisht se si e keni vendosur sistemin, manualisht ose përmes kodit Terraform, ai do të funksionojë njësoj.

Ne identifikohemi nëpërmjet SSH në makinën virtuale EC2 ku është instaluar Kinesis Agent dhe ekzekutojmë skriptin api_caller.py

sudo ./api_caller.py TOKEN

Gjithçka që duhet të bëni është të prisni një SMS në numrin tuaj:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
SMS - mesazhi arrin në telefon në pothuajse 1 minutë:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server
Mbetet për të parë nëse të dhënat janë ruajtur në bazën e të dhënave DynamoDB për analiza të mëvonshme, më të detajuara. Tabela e biletave të linjës ajrore përmban afërsisht të dhënat e mëposhtme:

Integrimi i Aviasales API me Amazon Kinesis dhe thjeshtësia pa server

Përfundim

Gjatë punës së bërë, u ndërtua një sistem përpunimi i të dhënave në internet bazuar në Amazon Kinesis. Janë marrë në konsideratë opsionet për përdorimin e Kinesis Agent në lidhje me Kinesis Data Streams dhe analitikë në kohë reale Kinesis Analytics duke përdorur komandat SQL, si dhe ndërveprimin e Amazon Kinesis me shërbimet e tjera AWS.

Ne vendosëm sistemin e mësipërm në dy mënyra: një manual mjaft të gjatë dhe një të shpejtë nga kodi Terraform.

I gjithë kodi burimor i projektit është i disponueshëm në depon time të GitHub, ju sugjeroj të njiheni me të.

Jam i lumtur të diskutoj artikullin, pres me padurim komentet tuaja. Shpresoj për kritika konstruktive.

Ju uroj suksese!

Burimi: www.habr.com

Shto një koment