Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

Aupa Habr!

Hegazkinak hegan egitea gustatzen al zaizu? Maite dut, baina auto-isolamenduan zehar baliabide ezagun batetik - Aviasales-eko hegazkin-txartelei buruzko datuak aztertzeaz ere maitemindu nintzen.

Gaur Amazon Kinesis-en lana aztertuko dugu, denbora errealeko analisiekin streaming sistema bat eraikiko dugu, Amazon DynamoDB NoSQL datu-basea instalatuko dugu datu biltegiratze nagusi gisa eta SMS jakinarazpenak ezarriko ditugu sarrera interesgarrietarako.

Xehetasun guztiak mozketa azpian daude! Joan!

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

Sarrera

Adibidez, sarbidea behar dugu Aviasales APIa. Doan eta murrizketarik gabe ematen da sarbidea; "Garatzaileak" atalean erregistratu besterik ez duzu behar zure API tokena jasotzeko datuak sartzeko.

Artikulu honen helburu nagusia AWS-en informazioaren streaming-aren erabileraren ulermen orokorra ematea da; kontuan hartzen dugu erabilitako APIak itzultzen dituen datuak ez direla guztiz eguneratuak eta cachetik transmititzen direla, hau da. Azken 48 orduetan Aviasales.ru eta Jetradar.com guneetako erabiltzaileek egindako bilaketetan oinarrituta eratua.

Kinesis-agenteak, ekoizle makinan instalatuta, API bidez jasotako datuak automatikoki analizatu eta transmitituko ditu nahi den korronteari Kinesis Data Analytics bidez. Korronte honen bertsio gordina zuzenean idatziko da dendan. DynamoDB-n inplementatutako datu gordinak biltegiratzeak txartelak aztertzeko aukera emango du BI tresnen bidez, hala nola AWS Quick Sight.

Azpiegitura osoa zabaltzeko bi aukera aztertuko ditugu:

  • Eskuliburua - AWS Kudeaketa kontsolaren bidez;
  • Terraform kodearen azpiegitura automatizatzaile alferrarentzat da;

Garatutako sistemaren arkitektura

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Erabilitako osagaiak:

  • Aviasales APIa β€” API honek itzultzen dituen datuak ondorengo lan guztietan erabiliko dira;
  • EC2 Producer Instantzia β€” hodeian dagoen makina birtual arrunta, non sarrerako datu-korrontea sortuko den:
    • Kinesis Agentea Makinan lokalean instalatutako Java aplikazio bat da, Kinesis-era (Kinesis Data Streams edo Kinesis Firehose) datuak biltzeko eta bidaltzeko modu erraz bat eskaintzen duena. Agenteak etengabe kontrolatzen du zehaztutako direktorioetako fitxategi multzo bat eta datu berriak bidaltzen ditu Kinesis-era;
    • API Deitzailearen Scripta β€” APIari eskaerak egiten dizkion Python script bat eta erantzuna Kinesis Agenteak kontrolatzen duen karpeta batean jartzen du;
  • Kinesis Data Streams β€” denbora errealeko datu-streaming zerbitzua eskalatzeko gaitasun handiekin;
  • Kinesis Analytics zerbitzaririk gabeko zerbitzu bat da, denbora errealean streaming datuak aztertzea errazten duena. Amazon Kinesis Data Analytics aplikazio-baliabideak konfiguratzen ditu eta automatikoki eskalatzen ditu sarrerako edozein datu-bolumen kudeatzeko;
  • AWS Lambda β€” kodea exekutatzeko aukera ematen duen zerbitzua, babeskopia egin edo zerbitzariak konfiguratu gabe. Konputazio-potentzia guztia automatikoki eskalatzen da dei bakoitzeko;
  • Amazon DynamoDB - Gako-balio bikoteen eta dokumentuen datu-basea, edozein eskalatan exekutatzen denean 10 milisegundo baino gutxiagoko latentzia eskaintzen duena. DynamoDB erabiltzean, ez duzu zerbitzaririk hornitu, adabaki edo kudeatu beharrik. DynamoDB-k automatikoki eskalatzen ditu taulak eskuragarri dauden baliabideen kopurua doitzeko eta errendimendu handia mantentzeko. Ez da sistemaren administrazioa behar;
  • Amazon SNS - Argitaletxe-harpidedun (Pub/Sub) eredua erabiliz mezuak bidaltzeko guztiz kudeatutako zerbitzua, zeinarekin mikrozerbitzuak, sistema banatuak eta zerbitzaririk gabeko aplikazioak isolatu ahal izateko. SNS erabil daiteke azken erabiltzaileei informazioa bidaltzeko mugikorreko push jakinarazpenen, SMS mezuen eta mezu elektronikoen bidez.

Hasierako prestakuntza

Datu-fluxua emulatzeko, Aviasales APIak itzultzen duen hegazkin-txartelen informazioa erabiltzea erabaki nuen. IN dokumentazioa metodo ezberdinen zerrenda nahiko zabala, har dezagun horietako bat - "Hileroko prezioen egutegia", hilabeteko egun bakoitzeko prezioak itzultzen dituena, transferentzia kopuruaren arabera taldekatuta. Eskaeran bilaketa-hilabetea zehazten ez baduzu, uneko hilabetearen ondorengo informazioa itzuliko da.

Beraz, erregistra gaitezen eta lortu gure tokena.

Behean dago eskaera adibide bat:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Eskaeran token bat zehaztuz APItik datuak jasotzeko goiko metodoak funtzionatuko du, baina nahiago dut sarbide-tokena goiburutik pasatzea, beraz, metodo hau api_caller.py script-ean erabiliko dugu.

Erantzunaren adibidea:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Goiko APIaren erantzunaren adibidean San Petersburgotik Phukera txartel bat erakusten da... Oh, zer ametsa...
Kazangoa naizenez, eta Phuket orain "amets bat baino ez" denez, bila ditzagun San Petersburgotik Kazanera txartelak.

Dagoeneko AWS kontu bat duzula suposatzen du. Berehala arreta berezia jarri nahi dut Kinesis eta jakinarazpenak SMS bidez bidaltzea ez daudela urteko urteko Doako maila (doako erabilera). Baina hala ere, dolar pare bat kontuan hartuta, nahiko posible da proposatutako sistema eraikitzea eta horrekin jolastea. Eta, noski, ez ahaztu baliabide guztiak ezabatzea behar ez direnean.

Zorionez, DynamoDb eta lambda funtzioak doakoak izango zaizkigu hileroko doako mugak betetzen baditugu. Adibidez, DynamoDBrako: 25 GB biltegiratze, 25 WCU/RCU eta 100 milioi kontsulta. Eta milioi bat lambda funtzio dei hilean.

Sistema eskuz zabaltzea

Kinesis Data Stream konfiguratzea

Goazen Kinesis Data Streams zerbitzura eta sortu bi korronte berri, zati bat bakoitzeko.

Zer da zati bat?
Zati bat Amazon Kinesis korronte baten oinarrizko datu-transferentzia-unitatea da. Segmentu batek sarrerako datuen transferentzia 1 MB/s-ko abiaduran eskaintzen du eta irteerako datuen transferentzia 2 MB/s-ko abiaduran. Segmentu batek segundoko 1000 PUT sarrera onartzen ditu. Datu-korronte bat sortzean, beharrezko segmentu kopurua zehaztu behar duzu. Adibidez, datu-korronte bat sor dezakezu bi segmenturekin. Datu-korronte honek sarrerako datuen transferentzia emango du 2 MB/s-ko eta irteerako datuen transferentzia 4 MB/s-ko, segundoko 2000 PUT erregistro onartzen ditu.

Zenbat eta zati gehiago korrontean, orduan eta errendimendu handiagoa. Printzipioz, horrela eskalatzen dira fluxuak - zatiak gehituz. Baina zenbat eta zati gehiago izan, orduan eta prezio handiagoa izango da. Zati bakoitzak 1,5 zentimo balio du orduko eta 1.4 zentimo gehiago PUT karga-unitate bakoitzeko.

Sortu dezagun korronte berri bat izenarekin airelinea_txartelak, zati 1 nahikoa izango da berarentzat:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Orain sortu dezagun beste hari bat izenarekin korronte_bereziak:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

Ekoizlearen konfigurazioa

Zeregin bat aztertzeko, nahikoa da EC2 instantzia arrunt bat erabiltzea datu-ekoizle gisa. Ez du zertan makina birtual indartsu eta garestia izan; t2.micro spot batek ondo egingo du.

Ohar garrantzitsua: adibidez, irudia erabili beharko zenuke - Amazon Linux AMI 2018.03.0, Kinesis Agentea azkar abiarazteko ezarpen gutxiago ditu.

Joan EC2 zerbitzura, sortu makina birtual berri bat, hautatu nahi duzun AMI t2.micro motarekin, Doako Mailan sartuta dagoena:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Sortu berri den makina birtualak Kinesis zerbitzuarekin elkarreragin ahal izateko, horretarako eskubideak eman behar zaizkio. Horretarako modurik onena IAM Rol bat esleitzea da. Hori dela eta, 3. urratsa: Konfiguratu Instantziaren xehetasunak pantailan, hautatu beharko zenuke Sortu IAM Rol berria:

EC2rako IAM rola sortzea
Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Irekitzen den leihoan, hautatu EC2-rako rol berri bat sortzen ari garela eta joan Baimenak atalera:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Prestakuntza-adibidea erabiliz, ez dugu baliabide-eskubideen konfigurazio pikorren konplexutasun guztietan sartu beharrik, beraz, Amazonek aurrez konfiguratutako politikak hautatuko ditugu: AmazonKinesisFullAccess eta CloudWatchFullAccess.

Eman diezaiogun izen esanguratsu bat rol honi, adibidez: EC2-KinesisStreams-FullAccess. Emaitza beheko irudian agertzen den berdina izan behar du:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Rol berri hau sortu ondoren, ez ahaztu sortutako makina birtualeko instantziari eranstea:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Pantaila honetan ez dugu beste ezer aldatzen eta hurrengo leihoetara joango gara.

Disko gogorreko ezarpenak lehenetsi gisa utz daitezke, baita etiketak ere (etiketak erabiltzea praktika ona den arren, gutxienez instantziari izena eman eta ingurunea adierazi).

Orain 6. urratsean gaude: Konfiguratu Segurtasun Taldea fitxan, non berri bat sortu behar duzun edo lehendik duzun Segurtasun taldea zehaztu behar duzun, ssh bidez (22 ataka) instantziara konektatzeko aukera ematen duena. Hautatu Iturria -> Nire IPa bertan eta instantzia abiarazi dezakezu.

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Exekutatzen egoerara aldatu bezain laster, ssh bidez konektatzen saia zaitezke.

Kinesis Agent-ekin lan egin ahal izateko, makinara ongi konektatu ondoren, komando hauek sartu behar dituzu terminalean:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Sortu dezagun karpeta bat API erantzunak gordetzeko:

sudo mkdir /var/log/airline_tickets

Agentea hasi aurretik, bere konfigurazioa konfiguratu behar duzu:

sudo vim /etc/aws-kinesis/agent.json

Agent.json fitxategiaren edukiak honelakoa izan beharko luke:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Konfigurazio fitxategian ikus daitekeenez, agenteak /var/log/airline_tickets/ direktorioko .log luzapena duten fitxategiak kontrolatuko ditu, analizatu eta airline_tickets korrontera transferituko ditu.

Zerbitzua berrabiarazi eta martxan dagoela ziurtatzen dugu:

sudo service aws-kinesis-agent restart

Orain deskargatu dezagun APItik datuak eskatuko dituen Python script-a:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py script-ak Aviasales-i datuak eskatzen dizkio eta jasotako erantzuna Kinesis agenteak arakatzen duen direktorioan gordetzen du. Script honen ezarpena nahiko estandarra da, TicketsApi klase bat dago, APIa modu asinkronoan ateratzeko aukera ematen du. Token batekin goiburu bat pasatzen dugu eta parametroak eskatzen dizkiogu klase honi:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Agentearen ezarpen eta funtzionaltasun egokiak probatzeko, proba dezagun api_caller.py scripta exekutatu:

sudo ./api_caller.py TOKEN

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Eta lanaren emaitza agenteen erregistroetan eta Airline_tickets datu-korronteko Jarraipen fitxan ikusten dugu:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Ikus dezakezun bezala, dena funtzionatzen du eta Kinesis Agent-ek datuak arrakastaz bidaltzen ditu korrontera. Orain konfigura dezagun kontsumitzailea.

Kinesis Data Analytics konfiguratzea

Goazen sistema osoaren osagai nagusira - sortu Kinesis Data Analytics-en kinesis_analytics_airlines_app izeneko aplikazio berri bat:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Kinesis Data Analytics-ek Kinesis Streams-en datuen analisiak denbora errealean egiteko aukera ematen du SQL lengoaia erabiliz. Guztiz autoeskalatze-zerbitzu bat da (Kinesis Streams ez bezala) hau:

  1. korronte berriak sortzeko aukera ematen du (Output Stream) datu iturrietarako eskaeretan oinarrituta;
  2. aplikazioak exekutatzen ari ziren bitartean gertatutako akatsekin korronte bat eskaintzen du (Error Stream);
  3. automatikoki zehaztu dezake sarrerako datuen eskema (beharrezkoa bada eskuz birdefinitu daiteke).

Hau ez da zerbitzu merkea - 0.11 USD lan ordu bakoitzeko, beraz, kontu handiz erabili behar duzu eta amaitutakoan ezabatu.

Konektatu dezagun aplikazioa datu-iturburura:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Hautatu konektatuko garen korrontea (airline_tickets):

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Ondoren, IAM Rol berri bat erantsi behar duzu aplikazioak korrontetik irakurri eta korrontean idatzi dezan. Horretarako, nahikoa da Sarbide baimenen blokean ezer ez aldatzea:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Orain eska dezagun korrontean datu-eskema aurkitzea; horretarako, egin klik "Ezagutu eskema" botoian. Ondorioz, IAM rola eguneratuko da (berri bat sortuko da) eta eskemaren detekzioa abiaraziko da korrontean dagoeneko iritsi diren datuetatik:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Orain SQL editorera joan behar duzu. Botoi honetan klik egiten duzunean, leiho bat agertuko da aplikazioa abiarazteko eskatuz - hautatu zer abiarazi nahi duzun:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Sartu honako kontsulta sinple hau SQL editorearen leihoan eta egin klik Gorde eta Exekutatu SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Datu-base erlazionaletan, taulekin lan egiten duzu INSERT instrukzioak erabiliz erregistroak gehitzeko eta SELECT sententzia bat datuak kontsultatzeko. Amazon Kinesis Data Analytics-en, korronteekin (STREAM) eta ponpekin (PUMP) lan egiten duzu, aplikazio bateko korronte bateko datuak beste korronte batean txertatzen dituzten etengabeko txertatzeko eskaerak.

Goian aurkeztutako SQL kontsultak Aeroflot txartelak bilatzen ditu bost mila errublotik beherako kostuarekin. Baldintza hauek betetzen dituzten erregistro guztiak DESTINATION_SQL_STREAM korrontean jarriko dira.

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Helmuga blokean, hautatu special_stream korrontea eta aplikazio barruko korrontearen izena DESTINATION_SQL_STREAM goitibeherako zerrendan:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Manipulazio guztien emaitzak beheko irudiaren antzeko zerbait izan behar du:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

SNS gai bat sortzea eta harpidetzea

Joan Jakinarazpen Soilaren Zerbitzura eta sortu gai berri bat Airelineak izenarekin:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Harpidetu gai honetara eta adierazi zein telefono-zenbakira bidaliko diren SMS jakinarazpenak:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

Sortu taula bat DynamoDB-n

Airline_tickets korronteko datu gordinak gordetzeko, sor dezagun taula bat DynamoDB-n izen bereko. Record_id gako nagusi gisa erabiliko dugu:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

Lambda funtzio-biltzaile bat sortzea

Sortu dezagun Collector izeneko lambda funtzio bat, zeinaren zeregina airline_tickets korrontea galdetzea eta, bertan erregistro berriak aurkitzen badira, erregistro horiek DynamoDB taulan txertatzea izango da. Jakina, lehenetsitako eskubideez gain, lambda honek Kinesis datu-korronterako irakurtzeko sarbidea eta DynamoDBrako idazteko sarbidea izan behar du.

Biltzaile lambda funtziorako IAM rola sortzea
Lehenik eta behin, sor dezagun IAM rol berri bat Lambda-TicketsProcessingRole izeneko lambdarako:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Proba adibiderako, aurrez konfiguratutako AmazonKinesisReadOnlyAccess eta AmazonDynamoDBFullAccess politikak nahiko egokiak dira, beheko irudian ikusten den bezala:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

Lambda hau Kinesis-en abiarazle baten bidez abiarazi behar da sarrera berriak airline_stream-era sartzen direnean, beraz, abiarazle berri bat gehitu behar dugu:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Kodea sartu eta lambda gordetzea besterik ez da geratzen.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda funtzioaren jakinarazlea sortzea

Bigarren lambda funtzioa, bigarren korrontea (special_stream) kontrolatuko duena eta SNSra jakinarazpena bidaliko duena, antzera sortzen da. Beraz, lambda honek Kinesisetik irakurtzeko eta SNS gai jakin batera mezuak bidaltzeko sarbidea izan behar du, gero SNS zerbitzuak gai honen harpidedun guztiei bidaliko dizkie (posta elektronikoa, SMSa, etab.).

IAM rola sortzea
Lehenik eta behin, IAM rola Lambda-KinesisAlarm sortzen dugu lambda honetarako, eta gero esleitu eginkizun hau sortzen ari den alarm_notifier lambda-ri:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

Lambda honek erregistro berriak special_stream-en sartzeko abiarazle batean lan egin beharko luke, beraz, abiarazlea Collector lambda-rako egin genuen moduan konfiguratu behar duzu.

Lambda hau konfiguratzea errazteko, txerta dezagun ingurune-aldagai berri bat - TOPIC_ARN, non Airlines gaiaren ANR (Amazon Recourse Names) jartzen dugun:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Eta sartu lambda kodea, ez da batere konplikatua:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Badirudi hemen amaitzen dela sistemaren eskuzko konfigurazioa. Guztia behar bezala konfiguratu dugula probatzea eta ziurtatzea besterik ez da geratzen.

Inplementatu Terraform kodetik

Beharrezko prestaketa

Terraform kode irekiko tresna oso erosoa da azpiegitura kodetik zabaltzeko. Ikasteko erraza den sintaxi propioa du eta nola eta zer zabaldu behar den adibide asko ditu. Atom editoreak edo Visual Studio Codek plugin erabilgarri asko ditu, Terraform-ekin lan egitea errazten dutenak.

Banaketa deskargatu dezakezu beraz,. Terraform gaitasun guztien azterketa zehatza artikulu honen esparrutik kanpo dago, beraz, puntu nagusietara mugatuko gara.

Nola hasi

Proiektuaren kode osoa da nire biltegian. Biltegia geuretzat klonatzen dugu. Hasi aurretik, ziurtatu behar duzu AWS CLI instalatuta eta konfiguratuta duzula, izan ere... Terraformek kredentzialak bilatuko ditu ~/.aws/credentials fitxategian.

Praktika ona da plan komandoa exekutatu azpiegitura osoa zabaldu aurretik, Terraformek une honetan hodeian zer sortzen ari zaigun ikusteko:

terraform.exe plan

Jakinarazpenak bidaltzeko telefono-zenbaki bat sartzeko eskatuko zaizu. Ez da beharrezkoa fase honetan sartzea.

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Programaren funtzionamendu-plana aztertuta, baliabideak sortzen has gaitezke:

terraform.exe apply

Komando hau bidali ondoren, berriro telefono-zenbaki bat sartzeko eskatuko zaizu; markatu "bai" ekintzak benetan egiteari buruzko galdera bat erakusten denean. Horrek azpiegitura osoa konfiguratzeko, EC2ren beharrezko konfigurazio guztiak egiteko, lambda funtzioak zabaltzeko, etab.

Baliabide guztiak Terraform kodearen bidez arrakastaz sortu ondoren, Kinesis Analytics aplikazioaren xehetasunetan sartu behar duzu (zoritxarrez, ez dut aurkitu kodetik zuzenean nola egin).

Abiarazi aplikazioa:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Horren ondoren, aplikazio barruko korrontearen izena esplizituki ezarri behar duzu goitibeherako zerrendatik hautatuta:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Orain dena prest dago.

Aplikazioa probatzen

Sistema nola zabaldu duzun edozein dela ere, eskuz edo Terraform kodearen bidez, berdin funtzionatuko du.

SSH bidez hasten gara Kinesis Agent instalatuta dagoen EC2 makina birtualean eta api_caller.py script-a exekutatzen dugu

sudo ./api_caller.py TOKEN

Zure zenbakira SMS bat itxaron besterik ez duzu egin behar:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
SMSa: ia minutu batean iristen da mezua telefonora:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin
Erregistroak DynamoDB datu-basean gorde diren ala ez ikustea falta da, ondorengo azterketa zehatzagoa egiteko. airline_tickets taulak datu hauek ditu gutxi gorabehera:

Aviasales API integrazioa Amazon Kinesis-ekin eta zerbitzaririk gabeko sinpletasunarekin

Ondorioa

Egindako lanetan, online datuak prozesatzeko sistema bat eraiki zen Amazon Kinesis-en oinarrituta. Kinesis Agentea Kinesis Data Streams-ekin eta denbora errealeko analisiarekin batera Kinesis Agentea erabiltzeko aukerak kontuan hartu ziren SQL komandoak erabiliz Kinesis Analytics, baita Amazon Kinesis-ek AWSko beste zerbitzu batzuekin duen elkarrekintza ere.

Goiko sistema bi modutan zabaldu dugu: eskuliburu luze samarra eta Terraform kodearen bizkorra.

Proiektuaren iturburu-kode guztia eskuragarri dago nire GitHub biltegian, ezagutzea proposatzen dizut.

Pozik nago artikulua eztabaidatzeaz, zure iruzkinak espero ditut. Kritika eraikitzaileak espero ditut.

Nahi dut arrakasta!

Iturria: www.habr.com

Gehitu iruzkin berria