Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

Hei Habr!

Unoda ndege dzinobhururuka here? Ndinozvida, asi panguva yekuzviparadzanisa nevamwe ndakadananawo nekuongorora data pamatikiti emhepo kubva kune imwe inozivikanwa sosi - Aviasales.

Nhasi tichaongorora basa reAmazon Kinesis, kuvaka sisitimu yekutenderera ine chaiyo-nguva analytics, isa iyo Amazon DynamoDB NoSQL dhatabhesi seyo huru yekuchengetedza data, uye kuseta zviziviso zveSMS zvematikiti anonakidza.

Zvose zvinyorwa zviri pasi pekucheka! Enda!

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

Nhanganyaya

Somuenzaniso, tinoda kuwana Purogiramu inonzi Aviasales. Kuwana kwairi kunopihwa mahara uye pasina zvirambidzo; iwe unongoda kunyoresa muchikamu che "Vagadziri" kuti ugamuchire yako API tokeni kuti uwane iyo data.

Chinangwa chikuru chechinyorwa ichi ndechekupa kunzwisiswa kwakazara kwekushandiswa kweruzivo kufambiswa muAWS; isu tinofunga kuti iyo data yakadzoserwa neAPI yakashandiswa haina kunyatso kuenderana uye inofambiswa kubva kucache, inova. yakaumbwa zvichienderana nekutsvaga kwevashandisi veAviasales.ru uye Jetradar.com masaiti kwemaawa makumi mana nemasere apfuura.

Kinesis-agent, yakaiswa pamushini wekugadzira, inotambirwa kuburikidza neAPI inozozvitsanangudza uye kuendesa data kune yaunoda rwizi kuburikidza neKinesis Data Analytics. The raw version yerukova urwu ichanyorwa zvakananga kuchitoro. Iyo mbishi data yekuchengetedza yakaiswa muDynamoDB inobvumira kudzika kwetiketi kuongororwa kuburikidza neBI maturusi, akadai seAWS Quick Sight.

Isu tichatarisa sarudzo mbiri dzekuisa iyo yese maturakiti:

  • Manual - kuburikidza neAWS Management Console;
  • Infrastructure kubva kuTerraform code ndeyesimbe automators;

Architecture yeyakagadziridzwa system

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Zvishandiso zvakashandiswa:

  • Purogiramu inonzi Aviasales - iyo data yakadzoserwa neiyi API ichashandiswa kune ese anotevera basa;
  • EC2 Mugadziri Muenzaniso - muchina wenguva dzose uri mugore pauchagadzirwa data rekuisa:
    • Kinesis Agent iJava application yakaiswa munharaunda pamushini inopa nzira iri nyore yekuunganidza uye kutumira data kuKinesis (Kinesis Data Streams kana Kinesis Firehose). Mumiririri anogara achitarisa seti yemafaira mune akatsanangurwa madhairekitori uye anotumira data nyowani kuKinesis;
    • API Caller Script -A Python script inoita zvikumbiro kuAPI uye inoisa mhinduro mufolda inotariswa neKinesis Agent;
  • Kinesis Data Streams -chaiyo-nguva yekufambisa data sevhisi ine yakakura kuyera kugona;
  • Kinesis Analytics ibasa risina server rinorerutsa kuongororwa kwekutepfenyura data munguva chaiyo. Amazon Kinesis Data Analytics inogadzirisa zviwanikwa zvekushandisa uye otomatiki zviyero kubata chero vhoriyamu yedata inouya;
  • AWS Lambda - sevhisi inokubvumira kuti uite kodhi pasina kutsigira kana kuseta maseva. Simba rese remakomputa rinoyerwa otomatiki kune imwe neimwe kufona;
  • Amazon DynamoDB -Databhesi yekiyi-kukosha paviri uye zvinyorwa zvinopa latency isingasviki gumi milliseconds kana ichimhanya chero chipi nechipi. Paunenge uchishandisa DynamoDB, haufanire kupa, chigamba, kana kubata chero maseva. DynamoDB inoyera otomatiki matafura kugadzirisa huwandu hwezviwanikwa zviripo uye kuchengetedza kushanda kwepamusoro. Hapana hurongwa hwekutonga hunodiwa;
  • Amazon SNS - sevhisi yakanyatsogadziriswa yekutumira mameseji uchishandisa muparidzi-munyoreri (Pub/Sub) modhi, iyo iwe yaunogona kupatsanura mamicroservices, akagoverwa masisitimu uye serverless application. SNS inogona kushandiswa kutumira ruzivo kupedzisa vashandisi kuburikidza nenharembozha ziviso, mameseji eSMS uye maemail.

Kudzidziswa kwekutanga

Kutevedzera kuyerera kwedata, ndakafunga kushandisa ruzivo rwetikiti rendege rakadzoserwa neAviasales API. IN zvinyorwa runyoro rwakakura rwenzira dzakasiyana, ngatitore imwe yadzo - "Mwedzi Mutengo Karenda", iyo inodzosera mitengo yezuva rega remwedzi, yakaunganidzwa nenhamba yekutamiswa. Kana iwe usingatsanangure mwedzi wekutsvaga muchikumbiro, ruzivo ruchadzoserwa kumwedzi unotevera wazvino.

Saka, ngatinyorei uye titore chiratidzo chedu.

Chikumbiro chemuenzaniso chiri pazasi:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Iyo nzira iri pamusoro yekugamuchira data kubva kuAPI nekutsanangura chiratidzo muchikumbiro ichashanda, asi ini ndinosarudza kupfuudza chiratidzo chekuwana kuburikidza nemusoro, saka isu tichashandisa nzira iyi mune api_caller.py script.

Mhinduro muenzaniso:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Muenzaniso API mhinduro iri pamusoro inoratidza tikiti kubva kuSt. Petersburg kuenda kuPhuk ... Oh, chii kurota ...
Sezvo ini ndinobva kuKazan, uye Phuket ikozvino "chiroto chete", ngatitarisei matikiti kubva kuSt. Petersburg kuenda kuKazan.

Zvinofungidzira kuti watova neAWS account. Ndinoda kukurumidza kukwevera kutarisisa kune chokwadi chekuti Kinesis uye kutumira zviziviso kuburikidza neSMS hazvibatanidzwe pagore. Yemahara Tier (kushandiswa kwemahara). Asi kunyangwe zvisinei neizvi, uine akati wandei emadhora mupfungwa, zvinokwanisika kuvaka iyo yakarongwa sisitimu uye kutamba nayo. Uye, hongu, usakanganwa kudzima zvese zviwanikwa mushure mekunge zvisingachadiwi.

Neraki, DynamoDb uye lambda mabasa anozove emahara kwatiri kana tikasangana nemahara edu epamwedzi emahara. Semuenzaniso, yeDynamoDB: 25 GB yekuchengetedza, makumi maviri neshanu WCU/RCU uye 25 miriyoni mibvunzo. Uye miriyoni lambda basa rinofona pamwedzi.

Manual system deployment

Kugadzira Kinesis Data Streams

Ngatiendei kuKinesis Data Streams sevhisi uye tigadzire hova mbiri nyowani, shard imwe kune yega.

Chii chinonzi shard?
A shard ndiyo yakakosha yekufambisa data unit yeAmazon Kinesis rwizi. Chimwe chikamu chinopa kuendesa data yekupinza nekumhanya kwe1 MB/s uye inobuda kuendesa data nekumhanya kwe2 MB/s. Chikamu chimwe chinotsigira kusvika ku1000 PUT zvinyorwa pasekondi. Paunenge uchigadzira data stream, unofanirwa kutsanangura nhamba inodiwa yezvikamu. Semuenzaniso, unogona kugadzira data stream ine zvikamu zviviri. Iyi dhata rwizi ichapa yekupinza data kuchinjisa pa2 MB/s uye inobuda kuendesa data pa4 MB/s, ichitsigira anosvika mazana maviri PUT marekodhi pasekondi.

Kana mashadhi akawanda ari murukova rwako, ndipo paanowedzera kubuda kwayo. Muchidimbu, iyi ndiyo nzira inoyerera inoyerwa - nekuwedzera shards. Asi iyo shards yakawanda yaunayo, iyo yakakwirira mutengo. Imwe neimwe shadhi inodhura masendi 1,5 paawa uye imwezve 1.4 masendi pamiriyoni yega yega PUT payload units.

Ngatigadzirei rukova rutsva rine zita airline_tickets, 1 shard ichamukwanira:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Zvino ngatigadzire imwe shinda ine zita special_stream:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

Mugadziri setup

Kuti uongorore basa, zvakakwana kushandisa yakajairika EC2 chiitiko semugadziri wedata. Haufanire kunge uri muchina une simba, unodhura chaiwo; nzvimbo t2.micro ichaita zvakanaka.

Chinyorwa chakakosha: semuenzaniso, iwe unofanirwa kushandisa mufananidzo - Amazon Linux AMI 2018.03.0, ine mashoma marongero ekukurumidza kuvhura Kinesis Agent.

Enda kusevhisi yeEC2, gadzira muchina mutsva, sarudza yaunoda AMI ine mhando t2.micro, iyo inosanganisirwa muMahara Tier:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Kuti iyo ichangobva kugadzirwa chaiyo muchina unokwanisa kudyidzana neKinesis sevhisi, inofanirwa kupihwa kodzero dzekudaro. Nzira yakanakisa yekuita izvi kugovera IAM Basa. Naizvozvo, pane Danho rechitatu: Gadzirisa Instance Details skrini, iwe unofanirwa kusarudza Gadzira itsva IAM Basa:

Kugadzira basa reIAM reEC2
Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Muhwindo rinovhurwa, sarudza kuti tiri kugadzira basa idzva reEC2 uye enda kuchikamu cheMvumo:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Tichishandisa muenzaniso wekudzidzisa, hatifanirwe kupinda mune zvese zvakaomesesa zve granular gadziriso yekodzero dzezviwanikwa, saka isu tichasarudza marongero akafanogadzirirwa neAmazon: AmazonKinesisFullAccess uye CloudWatchFullAccess.

Ngatipei rimwe zita rine musoro rebasa iri, semuenzaniso: EC2-KinesisStreams-FullAccess. Mhedzisiro yacho inofanira kunge yakafanana sezvakaratidzwa mumufananidzo uri pasi apa:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Mushure mekugadzira iyi nyowani basa, usakanganwa kuibatanidza kune yakagadzirwa virtual muchina muenzaniso:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Isu hatishandure chimwe chinhu pachiratidziro ichi uye tiende kune inotevera windows.

Iyo hard drive marongero anogona kusiiwa seakasarudzika, pamwe nema tag (kunyangwe iri yakanaka tsika kushandisa ma tag, kanenge ipa muenzaniso zita uye kuratidza nharaunda).

Iye zvino tave paNhanho 6: Gadzirisa Chengetedzo Boka tebhu, kwaunoda kugadzira imwe nyowani kana kutsanangura yako iripo Chengetedzo boka, iro rinokutendera kuti ubatanidze kuburikidza ne ssh (port 22) kumuenzaniso. Sarudza Kwakabva -> IP yangu ipapo uye unogona kuvhura muenzaniso.

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Painongochinja kuita chinzvimbo, unogona kuedza kubatana nayo kuburikidza ne ssh.

Kuti ukwanise kushanda neKinesis Agent, mushure mekubudirira kubatana kumuchina, unofanirwa kuisa iyo inotevera mirairo mune terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ngatigadzire folda kuchengetedza mhinduro dzeAPI:

sudo mkdir /var/log/airline_tickets

Usati watanga mumiririri, iwe unofanirwa kugadzirisa iyo config:

sudo vim /etc/aws-kinesis/agent.json

Zviri mukati meiyo agent.json faira zvinofanirwa kutaridzika seizvi:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Sezvinogona kuonekwa kubva mufaira rekugadzirisa, mumiririri achacherechedza mafaira ane .log extension mu /var/log/airline_tickets/dhairekitori, patsanura uye uendese kune airline_tickets stream.

Isu tinotangazve sevhisi uye tive nechokwadi chekuti yakamira uye inoshanda:

sudo service aws-kinesis-agent restart

Zvino ngatitorei Python script iyo inokumbira data kubva kuAPI:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Iyo api_caller.py script inokumbira data kubva kuAviasales uye inochengetedza mhinduro yakagamuchirwa mudhairekitori inotariswa nemumiriri weKinesis. Kuitwa kwechinyorwa ichi kwakaringana, kune kirasi yeTicketsApi, inokutendera kuti utore asynchronously kudhonza API. Isu tinopfuudza musoro nechiratidzo uye tinokumbira paramita kune ino kirasi:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Kuti uedze marongero chaiwo uye kushanda kwemumiririri, ngatiedze timhanye api_caller.py script:

sudo ./api_caller.py TOKEN

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Uye isu tinotarisa mhedzisiro yebasa muAgent logs uye pane Monitoring tebhu muairline_tickets data stream:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Sezvauri kuona, zvese zvinoshanda uye Kinesis Agent inobudirira kutumira data kurwizi. Zvino ngatigadzirirei mutengi.

Kugadzira Kinesis Data Analytics

Ngatienderei kuchikamu chepakati chehurongwa hwese - gadzira chishandiso chitsva muKinesis Data Analytics inonzi kinesis_analytics_airlines_app:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Kinesis Data Analytics inobvumidza iwe kuita chaiyo-nguva data analytics kubva kuKinesis Streams uchishandisa SQL mutauro. Iyo izere autoscaling sevhisi (kusiyana neKinesis Streams) iyo:

  1. inokutendera iwe kuti ugadzire nzizi nyowani (Output Stream) zvichibva pane zvikumbiro zvekupa data;
  2. inopa rukova nezvikanganiso zvakaitika apo maapplication achimhanya (Error Stream);
  3. inogona kuona otomatiki chirongwa chekuisa data (inogona kutsanangurwa patsva kana zvichidikanwa).

Iyi haisi sevhisi yakachipa - 0.11 USD paawa yebasa, saka unofanirwa kuishandisa nekungwarira uye kuidzima kana wapedza.

Ngatibatanidzei application kune data source:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Sarudza rukova rwatichabatanidza narwo (airline_tickets):

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Tevere, iwe unofanirwa kubatanidza itsva IAM Role kuitira kuti application iverenge kubva murukova uye kunyora kune kurukova. Kuti uite izvi, zvakakwana kusachinja chero chinhu muAccess mvumo block:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Zvino ngatikumbirei kuwanikwa kweiyo data schema murukova; kuti uite izvi, tinya bhatani re "Discover schema". Nekuda kweizvozvo, iro IAM basa richagadziridzwa (ritsva richagadzirwa) uye schema yekuona ichatangwa kubva kune iyo data yatosvika murukova:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Iye zvino iwe unofanirwa kuenda kune iyo SQL mupepeti. Paunodzvanya bhatani iri, hwindo rinoonekwa richikukumbira kuti utange application - sarudza zvaunoda kuvhura:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Isa mubvunzo unotevera wakapfava muSQL editor hwindo uye tinya Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Mune hukama dhatabhesi, unoshanda nematafura uchishandisa INSERT zvirevo kuti uwedzere marekodhi uye SELECT statement yekubvunza data. MuAmazon Kinesis Data Analytics, unoshanda nehova (STREAMs) nemapombi (PUMPs) -kuenderera mberi kwekuisa zvikumbiro zvinoisa data kubva kune imwe rwizi mukushandisa mune imwe rwizi.

Mubvunzo weSQL wakaunzwa pamusoro unotsvaga matikiti eAeroflot nemutengo uri pasi pezviuru zvishanu zveRubles. Marekodhi ese anoenderana nemamiriro aya achaiswa muDESTINATION_SQL_STREAM stream.

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Munzvimbo Yekuenda, sarudza special_stream stream, uye muIn-application stream zita DESTINATION_SQL_STREAM yekudonhedza pasi:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Mhedzisiro yezvese manipulations inofanira kunge yakafanana nemufananidzo uri pazasi:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

Kugadzira uye kunyorera kune SNS musoro

Enda kune Nyore Yekuzivisa Sevhisi uye gadzira musoro mutsva ipapo une zita rekuti Airlines:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Nyorera kuchinyorwa ichi uye ratidza nhare mbozha kwainozotumirwa zviziviso zveSMS:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

Gadzira tafura muDynamoDB

Kuti uchengetedze data rakasvibirira kubva kune yavo airline_tickets rwizi, ngatigadzire tafura muDynamoDB ine zita rimwe chete. Tichashandisa record_id sekiyi yekutanga:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

Kugadzira lambda basa muunganidzi

Ngatigadzirei lambda basa rinonzi Collector, iro basa richave rekuvhota airline_tickets rwizi uye, kana marekodhi matsva awanikwa ipapo, isa marekodhi aya muDynamoDB tafura. Zviripachena, mukuwedzera kune kodzero dzisiridzo, iyi lambda inofanirwa kunge yakaverengera kuwana iyo Kinesis data rwizi uye kunyora kupinda kuDynamoDB.

Kugadzira basa reIAM remuunganidzi lambda basa
Chekutanga, ngatigadzirei itsva IAM basa re lambda inonzi Lambda-TicketsProcessingRole:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Kumuenzaniso wekuyedza, iyo pre-yakagadzirirwa AmazonKinesisReadOnlyAccess uye AmazonDynamoDBFullAccess marongero akanyatsokodzera, sezvakaratidzwa pamufananidzo pazasi:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

Iyi lambda inofanirwa kutangwa neinokonzeresa kubva kuKinesis kana mapinda matsva apinda muairline_stream, saka isu tinofanirwa kuwedzera chinokonzeresa chitsva:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Chasara kuisa kodhi uye kuchengetedza lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Kugadzira lambda basa rekuzivisa

Chechipiri lambda basa, iro rinotarisa yechipiri rukova (special_stream) uye kutumira chiziviso kuSNS, inogadzirwa nenzira yakafanana. Naizvozvo, iyi lambda inofanirwa kuwana mukana wekuverenga kubva kuKinesis uye kutumira mameseji kune yakapihwa SNS musoro, iyo inozotumirwa neiyo SNS sevhisi kune vese vanyoreri veiyi nyaya (email, SMS, nezvimwewo).

Kugadzira basa reIAM
Kutanga, isu tinogadzira iyo IAM basa Lambda-KinesisAlarm yeiyi lambda, uye tozopa iri basa kune alarm_notifier lambda iri kugadzirwa:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

Iyi lambda inofanirwa kushanda pane inokonzeresa marekodhi matsva kuti apinde special_stream, saka unofanirwa kugadzirisa chinokonzeresa nenzira imwechete sezvatakaitira Collector lambda.

Kuita kuti zvive nyore kugadzirisa lambda iyi, ngatiunze shanduko itsva yenharaunda - TOPIC_ARN, kwatinoisa ANR (Amazon Recourse Names) yemusoro weAirlines:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Uye isa iyo lambda kodhi, haina kuoma zvachose:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Zvinoita sekuti apa ndipo panopedzwa iyo manual system configuration. Chasara kuyedza uye kuona kuti takagadzirisa zvese nemazvo.

Shandisa kubva kuTerraform kodhi

Kugadzirira kunodiwa

Terraform chinhu chiri nyore chakavhurika-sosi chishandiso chekuendesa zvivakwa kubva kukodhi. Iine syntax yayo iri nyore kudzidza uye ine mienzaniso yakawanda yekuti sei uye chii chekuendesa. Iyo Atom mupepeti kana Visual Studio Code ine akawanda anobatsira plugins anoita kuti kushanda neTerraform kuve nyore.

Unogona kudhawunirodha kugovera kubva pano. Ongororo yakadzama yezvese Terraform kugona kunodarika chiyero chechinyorwa ichi, saka isu tinozogumira pachedu pamapoinzi makuru.

Kutanga sei

Iyo yakazara kodhi yeprojekiti ndeye mudura rangu. Isu tinogadzirisa repository isu pachedu. Usati watanga, unofanirwa kuve nechokwadi chekuti une AWS CLI yakaiswa uye yakagadziriswa, nekuti ... Terraform ichatsvaga magwaro mu ~/.aws/credentials file.

Tsika yakanaka ndeyekumhanyisa hurongwa hwekuraira usati waendesa zvese zvivakwa kuti uone izvo Terraform iri kutigadzirira isu mugore:

terraform.exe plan

Iwe unozokumbirwa kuisa nhamba yefoni kutumira zviziviso kwairi. Hazvidikanwi kupinda mairi panguva ino.

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Mushure mekuongorora chirongwa chekushanda kwechirongwa, tinogona kutanga kugadzira zviwanikwa:

terraform.exe apply

Mushure mekutumira uyu murairo, iwe zvakare uchakumbirwa kuisa nhamba yefoni; dial "hongu" kana mubvunzo wekuita chaizvo zviito unoratidzwa. Izvi zvinokutendera kuti umise iyo yese zvivakwa, ita zvese zvinodikanwa kumisikidzwa yeEC2, deploy lambda mabasa, nezvimwe.

Mushure mekunge zviwanikwa zvese zvagadzirwa zvinobudirira kuburikidza neiyo Terraform kodhi, iwe unofanirwa kupinda mune iyo ruzivo rweKinesis Analytics application (nehurombo, ini handina kuwana maitiro ekuita izvi zvakananga kubva kune kodhi).

Tangisa application:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Mushure meizvi, iwe unofanirwa kuseta zvakajeka zita re-in-application rwizi nekusarudza kubva pane yekudonha-pasi rondedzero:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Iye zvino zvinhu zvose zvagadzirira kuenda.

Kuedza kushandisa

Hazvinei nekuti wakaisa sei sisitimu, pamaoko kana kuburikidza neTerraform kodhi, ichashanda zvakafanana.

Isu tinopinda kuburikidza neSSH kune EC2 chaiyo muchina uko Kinesis Agent inoisirwa uye mhanyisa api_caller.py script.

sudo ./api_caller.py TOKEN

Zvese zvaunofanirwa kuita kumirira SMS kunhamba yako:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
SMS - meseji inosvika pafoni yako mukati meminiti imwe chete:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore
Izvo zvinosara kuona kana zvinyorwa zvakachengetwa muDynamoDB dhatabhesi kune inotevera, yakadzama kuongororwa. The airline_tickets tafura ine data ingangoita inotevera:

Aviasales API kubatanidzwa neAmazon Kinesis uye serverless nyore

mhedziso

Mukati mebasa rakaitwa, online data processing system yakavakwa yakavakirwa paAmazon Kinesis. Sarudzo dzekushandisa Kinesis Agent pamwe chete neKinesis Data Streams uye chaiyo-nguva analytics Kinesis Analytics uchishandisa SQL mirairo, pamwe nekudyidzana kweAmazon Kinesis nemamwe masevhisi eAWS akatariswa.

Isu takaisa iyo iri pamusoro apa nenzira mbiri: yakarebesa bhuku uye inokurumidza kubva kuTerraform kodhi.

Yese purojekiti sosi kodhi inowanikwa mune yangu GitHub repository, ndinokurudzira kuti uzvijairane nazvo.

Ndiri kufara kukurukura nyaya yacho, ndinotarisira mhinduro dzenyu. Ndinotarisira kushoropodzwa kunovaka.

Ndinoshuva iwe kubudirira!

Source: www.habr.com

Voeg