Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Hoy Habr!

Gusto mo ba ng lumilipad na eroplano? Gustung-gusto ko ito, ngunit sa panahon ng pag-iisa sa sarili, nagustuhan ko rin ang pagsusuri ng data sa mga air ticket mula sa isang kilalang mapagkukunan - Aviasales.

Ngayon ay susuriin namin ang gawain ng Amazon Kinesis, bubuo ng streaming system na may real-time na analytics, i-install ang database ng Amazon DynamoDB NoSQL bilang pangunahing imbakan ng data, at i-set up ang mga notification sa SMS para sa mga kawili-wiling tiket.

Ang lahat ng mga detalye ay nasa ilalim ng hiwa! Go!

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Pagpapakilala

Para sa halimbawa, kailangan namin ng access sa Aviasales API. Ang pag-access dito ay ibinibigay nang walang bayad at walang mga paghihigpit; kailangan mo lamang na magparehistro sa seksyong "Mga Nag-develop" upang matanggap ang iyong token ng API upang ma-access ang data.

Ang pangunahing layunin ng artikulong ito ay magbigay ng pangkalahatang pag-unawa sa paggamit ng streaming ng impormasyon sa AWS; isinasaalang-alang namin na ang data na ibinalik ng API na ginamit ay hindi mahigpit na napapanahon at ipinadala mula sa cache, na kung saan ay nabuo batay sa mga paghahanap ng mga gumagamit ng Aviasales.ru at Jetradar.com na mga site sa huling 48 oras.

Ang Kinesis-agent, na naka-install sa makinang gumagawa, na natanggap sa pamamagitan ng API ay awtomatikong mag-parse at magpapadala ng data sa gustong stream sa pamamagitan ng Kinesis Data Analytics. Direktang isusulat sa tindahan ang raw na bersyon ng stream na ito. Ang raw data storage na naka-deploy sa DynamoDB ay magbibigay-daan para sa mas malalim na pagsusuri ng ticket sa pamamagitan ng BI tool, gaya ng AWS Quick Sight.

Isasaalang-alang namin ang dalawang opsyon para sa pag-deploy ng buong imprastraktura:

  • Manwal - sa pamamagitan ng AWS Management Console;
  • Ang imprastraktura mula sa Terraform code ay para sa mga tamad na automators;

Arkitektura ng binuong sistema

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Mga bahaging ginamit:

  • Aviasales API β€” ang data na ibinalik ng API na ito ay gagamitin para sa lahat ng kasunod na gawain;
  • Halimbawa ng Producer ng EC2 β€” isang regular na virtual machine sa cloud kung saan bubuo ang input data stream:
    • Ahente ng Kinesis ay isang Java application na lokal na naka-install sa makina na nagbibigay ng madaling paraan upang mangolekta at magpadala ng data sa Kinesis (Kinesis Data Streams o Kinesis Firehose). Ang ahente ay patuloy na sinusubaybayan ang isang set ng mga file sa tinukoy na mga direktoryo at nagpapadala ng bagong data sa Kinesis;
    • API Caller Script β€” Isang script ng Python na gumagawa ng mga kahilingan sa API at inilalagay ang tugon sa isang folder na sinusubaybayan ng Kinesis Agent;
  • Kinesis Data Stream β€” real-time na serbisyo sa streaming ng data na may malawak na kakayahan sa pag-scale;
  • Kinesis Analytics ay isang walang server na serbisyo na pinapasimple ang pagsusuri ng streaming data sa real time. Kino-configure ng Amazon Kinesis Data Analytics ang mga mapagkukunan ng application at awtomatikong nagsusukat upang mahawakan ang anumang dami ng papasok na data;
  • AWS Lambda β€” isang serbisyong nagbibigay-daan sa iyong magpatakbo ng code nang hindi nagba-back up o nagse-set up ng mga server. Ang lahat ng kapangyarihan sa pag-compute ay awtomatikong na-scale para sa bawat tawag;
  • Amazon DynamoDB - Isang database ng mga pares ng key-value at mga dokumento na nagbibigay ng latency na mas mababa sa 10 millisecond kapag tumatakbo sa anumang sukat. Kapag gumagamit ng DynamoDB, hindi na kailangang magbigay, mag-patch, o pamahalaan ang anumang mga server. Awtomatikong sinusuri ng DynamoDB ang mga talahanayan upang isaayos ang dami ng magagamit na mapagkukunan at mapanatili ang mataas na pagganap. Walang pangangasiwa ng system ang kinakailangan;
  • Amazon SNS - isang ganap na pinamamahalaang serbisyo para sa pagpapadala ng mga mensahe gamit ang modelo ng publisher-subscriber (Pub/Sub), kung saan maaari mong ihiwalay ang mga microservice, distributed system at serverless na application. Maaaring gamitin ang SNS upang magpadala ng impormasyon sa mga end user sa pamamagitan ng mga mobile push notification, SMS message at email.

Paunang pagsasanay

Upang tularan ang daloy ng data, nagpasya akong gamitin ang impormasyon ng tiket sa eroplano na ibinalik ng Aviasales API. SA dokumentasyon isang malawak na listahan ng iba't ibang mga pamamaraan, kunin natin ang isa sa mga ito - "Kalendaryo ng Buwanang Presyo", na nagbabalik ng mga presyo para sa bawat araw ng buwan, na nakagrupo ayon sa bilang ng mga paglilipat. Kung hindi mo tinukoy ang buwan ng paghahanap sa kahilingan, ibabalik ang impormasyon para sa buwan kasunod ng kasalukuyang buwan.

Kaya, magrehistro tayo at kunin ang ating token.

Ang isang halimbawa ng kahilingan ay nasa ibaba:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ang paraan sa itaas ng pagtanggap ng data mula sa API sa pamamagitan ng pagtukoy ng isang token sa kahilingan ay gagana, ngunit mas gusto kong ipasa ang access token sa pamamagitan ng header, kaya gagamitin namin ang paraang ito sa api_caller.py script.

Halimbawa ng sagot:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Ang halimbawang tugon ng API sa itaas ay nagpapakita ng tiket mula sa St. Petersburg hanggang Phuk... Oh, magandang panaginip...
Dahil ako ay mula sa Kazan, at ang Phuket ay "panaginip lamang" ngayon, maghanap tayo ng mga tiket mula St. Petersburg hanggang Kazan.

Ipinapalagay nito na mayroon ka nang AWS account. Nais kong agad na gumuhit ng espesyal na pansin sa katotohanan na ang Kinesis at pagpapadala ng mga abiso sa pamamagitan ng SMS ay hindi kasama sa taunang Libreng Tier (libreng paggamit). Ngunit kahit na sa kabila nito, na may ilang mga dolyar sa isip, ito ay lubos na posible na bumuo ng iminungkahing sistema at paglaruan ito. At, siyempre, huwag kalimutang tanggalin ang lahat ng mga mapagkukunan pagkatapos na hindi na sila kailangan.

Sa kabutihang palad, ang mga function ng DynamoDb at lambda ay magiging libre para sa amin kung matutugunan namin ang aming mga buwanang libreng limitasyon. Halimbawa, para sa DynamoDB: 25 GB ng storage, 25 WCU/RCU at 100 milyong query. At isang milyong lambda function na tawag bawat buwan.

Manu-manong pag-deploy ng system

Pagse-set up ng Kinesis Data Streams

Pumunta tayo sa serbisyo ng Kinesis Data Streams at lumikha ng dalawang bagong stream, isang shard para sa bawat isa.

Ano ang shard?
Ang shard ay ang pangunahing data transfer unit ng isang Amazon Kinesis stream. Nagbibigay ang isang segment ng paglipat ng data ng input sa bilis na 1 MB/s at paglilipat ng data ng output sa bilis na 2 MB/s. Sinusuportahan ng isang segment ang hanggang 1000 entry sa PUT bawat segundo. Kapag gumagawa ng stream ng data, kailangan mong tukuyin ang kinakailangang bilang ng mga segment. Halimbawa, maaari kang lumikha ng stream ng data na may dalawang segment. Ang data stream na ito ay magbibigay ng input data transfer sa 2 MB/s at output ng data transfer sa 4 MB/s, na sumusuporta sa hanggang 2000 PUT record bawat segundo.

Ang mas maraming shards sa iyong stream, mas malaki ang throughput nito. Sa prinsipyo, ito ay kung paano nasusukat ang mga daloy - sa pamamagitan ng pagdaragdag ng mga shards. Ngunit kung mas maraming shards ang mayroon ka, mas mataas ang presyo. Ang bawat shard ay nagkakahalaga ng 1,5 cents kada oras at karagdagang 1.4 cents para sa bawat milyong PUT payload units.

Gumawa tayo ng bagong stream na may pangalan airline_tickets, 1 shard ay sapat na para sa kanya:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Ngayon gumawa tayo ng isa pang thread na may pangalan special_stream:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Setup ng producer

Upang pag-aralan ang isang gawain, sapat na gumamit ng regular na instance ng EC2 bilang isang producer ng data. Hindi ito kailangang maging isang makapangyarihan, mamahaling virtual machine; ang isang spot t2.micro ay magiging maayos.

Mahalagang tala: halimbawa, dapat mong gamitin ang imahe - Amazon Linux AMI 2018.03.0, mayroon itong mas kaunting mga setting para sa mabilis na paglulunsad ng Kinesis Agent.

Pumunta sa serbisyo ng EC2, lumikha ng bagong virtual machine, piliin ang nais na AMI na may uri na t2.micro, na kasama sa Libreng Tier:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Upang ang bagong likhang virtual machine ay maaaring makipag-ugnayan sa serbisyo ng Kinesis, dapat itong bigyan ng mga karapatan na gawin ito. Ang pinakamahusay na paraan upang gawin ito ay ang magtalaga ng Tungkulin ng IAM. Samakatuwid, sa Hakbang 3: I-configure ang Mga Detalye ng Instance screen, dapat kang pumili Gumawa ng bagong Tungkulin ng IAM:

Paglikha ng tungkulin ng IAM para sa EC2
Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Sa window na bubukas, piliin na gumagawa kami ng bagong tungkulin para sa EC2 at pumunta sa seksyong Mga Pahintulot:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Gamit ang halimbawa ng pagsasanay, hindi namin kailangang pumunta sa lahat ng sali-salimuot ng butil na pagsasaayos ng mga karapatan sa mapagkukunan, kaya pipiliin namin ang mga patakarang na-pre-configure ng Amazon: AmazonKinesisFullAccess at CloudWatchFullAccess.

Magbigay tayo ng ilang makabuluhang pangalan para sa tungkuling ito, halimbawa: EC2-KinesisStreams-FullAccess. Ang resulta ay dapat na kapareho ng ipinapakita sa larawan sa ibaba:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Matapos gawin ang bagong tungkuling ito, huwag kalimutang ilakip ito sa nilikhang virtual machine na halimbawa:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Wala kaming binago sa screen na ito at lumipat sa susunod na mga window.

Ang mga setting ng hard drive ay maaaring iwanang default, pati na rin ang mga tag (bagama't magandang kasanayan na gumamit ng mga tag, bigyan man lang ng pangalan ang instance at ipahiwatig ang kapaligiran).

Ngayon ay nasa Step 6 na tayo: tab na I-configure ang Security Group, kung saan kailangan mong lumikha ng bago o tukuyin ang iyong umiiral na Security group, na nagbibigay-daan sa iyong kumonekta sa pamamagitan ng ssh (port 22) sa instance. Piliin ang Source -> My IP doon at maaari mong ilunsad ang instance.

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Sa sandaling lumipat ito sa running status, maaari mong subukang kumonekta dito sa pamamagitan ng ssh.

Upang makapagtrabaho sa Kinesis Agent, pagkatapos na matagumpay na kumonekta sa makina, dapat mong ipasok ang mga sumusunod na command sa terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Gumawa tayo ng folder para i-save ang mga tugon ng API:

sudo mkdir /var/log/airline_tickets

Bago simulan ang ahente, kailangan mong i-configure ang config nito:

sudo vim /etc/aws-kinesis/agent.json

Ang mga nilalaman ng agent.json file ay dapat magmukhang ganito:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Gaya ng makikita mula sa configuration file, susubaybayan ng ahente ang mga file na may extension na .log sa /var/log/airline_tickets/ direktoryo, i-parse ang mga ito at ililipat ang mga ito sa stream ng airline_tickets.

I-restart namin ang serbisyo at tinitiyak na ito ay gumagana at tumatakbo:

sudo service aws-kinesis-agent restart

Ngayon, i-download natin ang script ng Python na hihiling ng data mula sa API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Ang script ng api_caller.py ay humihiling ng data mula sa Aviasales at sine-save ang natanggap na tugon sa direktoryo na ini-scan ng ahente ng Kinesis. Ang pagpapatupad ng script na ito ay medyo pamantayan, mayroong isang klase ng TicketsApi, pinapayagan ka nitong asynchronously hilahin ang API. Nagpapasa kami ng header na may token at humiling ng mga parameter sa klase na ito:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Upang subukan ang mga tamang setting at functionality ng ahente, subukan nating patakbuhin ang api_caller.py script:

sudo ./api_caller.py TOKEN

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
At tinitingnan namin ang resulta ng trabaho sa mga log ng Ahente at sa tab na Pagsubaybay sa stream ng data ng airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Gaya ng nakikita mo, gumagana ang lahat at matagumpay na naipadala ng Kinesis Agent ang data sa stream. Ngayon ay i-configure natin ang consumer.

Pagse-set up ng Kinesis Data Analytics

Lumipat tayo sa gitnang bahagi ng buong system - lumikha ng bagong application sa Kinesis Data Analytics na pinangalanang kinesis_analytics_airlines_app:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Binibigyang-daan ka ng Kinesis Data Analytics na magsagawa ng real-time na data analytics mula sa Kinesis Streams gamit ang wikang SQL. Ito ay isang ganap na autoscaling na serbisyo (hindi tulad ng Kinesis Streams) na:

  1. nagbibigay-daan sa iyong lumikha ng mga bagong stream (Output Stream) batay sa mga kahilingan sa source data;
  2. nagbibigay ng stream na may mga error na naganap habang tumatakbo ang mga application (Error Stream);
  3. maaaring awtomatikong matukoy ang scheme ng data ng input (maaari itong manu-manong muling tukuyin kung kinakailangan).

Ito ay hindi isang murang serbisyo - 0.11 USD bawat oras ng trabaho, kaya dapat mong gamitin ito nang maingat at tanggalin ito kapag tapos ka na.

Ikonekta natin ang application sa data source:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Piliin ang stream kung saan kami ikokonekta (airline_tickets):

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Susunod, kailangan mong mag-attach ng bagong Tungkulin ng IAM upang makapagbasa ang application mula sa stream at makapagsulat sa stream. Upang gawin ito, sapat na na huwag baguhin ang anuman sa block ng mga pahintulot sa Pag-access:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Ngayon, hilingin natin ang pagtuklas ng data schema sa stream; para gawin ito, mag-click sa button na "Discover schema". Bilang resulta, ang tungkulin ng IAM ay maa-update (isang bago ang gagawin) at ang schema detection ay ilulunsad mula sa data na dumating na sa stream:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Ngayon ay kailangan mong pumunta sa editor ng SQL. Kapag nag-click ka sa button na ito, lalabas ang isang window na humihiling sa iyong ilunsad ang application - piliin kung ano ang gusto mong ilunsad:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Ipasok ang sumusunod na simpleng query sa window ng SQL editor at i-click ang I-save at Patakbuhin ang SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Sa mga relational na database, nagtatrabaho ka sa mga talahanayan gamit ang mga INSERT na pahayag upang magdagdag ng mga tala at isang SELECT statement upang mag-query ng data. Sa Amazon Kinesis Data Analytics, nagtatrabaho ka sa mga stream (STREAMs) at pumps (PUMPs)β€”mga patuloy na kahilingan sa pagpasok na naglalagay ng data mula sa isang stream sa isang application papunta sa isa pang stream.

Ang SQL query na ipinakita sa itaas ay naghahanap ng mga tiket sa Aeroflot sa halagang mas mababa sa limang libong rubles. Ang lahat ng mga tala na nakakatugon sa mga kundisyong ito ay ilalagay sa DESTINATION_SQL_STREAM stream.

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Sa Destination block, piliin ang special_stream stream, at sa In-application stream name DESTINATION_SQL_STREAM drop-down list:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Ang resulta ng lahat ng manipulasyon ay dapat na katulad ng larawan sa ibaba:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Paglikha at pag-subscribe sa isang paksa ng SNS

Pumunta sa Simple Notification Service at lumikha ng bagong paksa doon na may pangalang Airlines:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Mag-subscribe sa paksang ito at ipahiwatig ang numero ng mobile phone kung saan ipapadala ang mga abiso sa SMS:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Lumikha ng talahanayan sa DynamoDB

Para i-store ang raw data mula sa kanilang airline_tickets stream, gumawa tayo ng table sa DynamoDB na may parehong pangalan. Gagamitin namin ang record_id bilang pangunahing susi:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Paglikha ng isang kolektor ng function ng lambda

Gumawa tayo ng lambda function na tinatawag na Collector, na ang gawain ay ang pag-poll sa airline_tickets stream at, kung may mga bagong record na makikita doon, ipasok ang mga record na ito sa DynamoDB table. Malinaw, bilang karagdagan sa mga default na karapatan, ang lambda na ito ay dapat na may read access sa Kinesis data stream at write access sa DynamoDB.

Paglikha ng tungkulin ng IAM para sa function ng collector lambda
Una, gumawa tayo ng bagong tungkulin ng IAM para sa lambda na pinangalanang Lambda-TicketsProcessingRole:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Para sa halimbawa ng pagsubok, ang paunang na-configure na mga patakaran ng AmazonKinesisReadOnlyAccess at AmazonDynamoDBFullAccess ay angkop, tulad ng ipinapakita sa larawan sa ibaba:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Ang lambda na ito ay dapat na ilunsad sa pamamagitan ng trigger mula sa Kinesis kapag pumasok ang mga bagong entry sa airline_stream, kaya kailangan nating magdagdag ng bagong trigger:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Ang natitira na lang ay ipasok ang code at i-save ang lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Paglikha ng lambda function notifier

Ang pangalawang lambda function, na susubaybay sa pangalawang stream (special_stream) at magpapadala ng notification sa SNS, ay ginawa sa katulad na paraan. Samakatuwid, ang lambda na ito ay dapat magkaroon ng access upang magbasa mula sa Kinesis at magpadala ng mga mensahe sa isang partikular na paksa ng SNS, na pagkatapos ay ipapadala ng serbisyo ng SNS sa lahat ng mga subscriber ng paksang ito (email, SMS, atbp.).

Paglikha ng tungkulin ng IAM
Una, gagawin namin ang tungkulin ng IAM na Lambda-KinesisAlarm para sa lambda na ito, at pagkatapos ay italaga ang tungkuling ito sa alarm_notifier lambda na ginagawa:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Ang lambda na ito ay dapat gumana sa isang trigger para sa mga bagong tala na makapasok sa special_stream, kaya kailangan mong i-configure ang trigger sa parehong paraan tulad ng ginawa namin para sa Collector lambda.

Para mas madaling i-configure ang lambda na ito, magpakilala tayo ng bagong environment variable - TOPIC_ARN, kung saan inilalagay natin ang ANR (Amazon Recourse Names) ng paksa ng Airlines:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
At ipasok ang lambda code, hindi ito kumplikado:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Tila dito nakumpleto ang manu-manong pagsasaayos ng system. Ang natitira na lang ay subukan at tiyaking na-configure namin nang tama ang lahat.

I-deploy mula sa Terraform code

Kinakailangang paghahanda

Terraform ay isang napaka-maginhawang open-source na tool para sa pag-deploy ng imprastraktura mula sa code. Mayroon itong sariling syntax na madaling matutunan at maraming halimbawa kung paano at kung ano ang i-deploy. Ang Atom editor o Visual Studio Code ay may maraming madaling gamiting plugin na nagpapadali sa pagtatrabaho sa Terraform.

Maaari mong i-download ang pamamahagi kaya. Ang isang detalyadong pagsusuri ng lahat ng kakayahan ng Terraform ay lampas sa saklaw ng artikulong ito, kaya lilimitahan namin ang aming sarili sa mga pangunahing punto.

Paano magsimula

Ang buong code ng proyekto ay sa aking imbakan. Kino-clone namin ang repository sa aming sarili. Bago magsimula, kailangan mong tiyakin na mayroon kang AWS CLI na naka-install at na-configure, dahil... Maghahanap ang Terraform ng mga kredensyal sa ~/.aws/credentials file.

Ang isang magandang kasanayan ay ang patakbuhin ang utos ng plano bago i-deploy ang buong imprastraktura upang makita kung ano ang kasalukuyang ginagawa ng Terraform para sa amin sa cloud:

terraform.exe plan

Ipo-prompt kang maglagay ng numero ng telepono kung saan papadalhan ng mga notification. Hindi kinakailangan na ipasok ito sa yugtong ito.

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Matapos masuri ang plano ng pagpapatakbo ng programa, maaari tayong magsimulang lumikha ng mga mapagkukunan:

terraform.exe apply

Pagkatapos ipadala ang utos na ito, hihilingin muli sa iyong maglagay ng numero ng telepono; i-dial ang "oo" kapag ang isang tanong tungkol sa aktwal na pagsasagawa ng mga aksyon ay ipinapakita. Papayagan ka nitong i-set up ang buong imprastraktura, isagawa ang lahat ng kinakailangang pagsasaayos ng EC2, i-deploy ang mga function ng lambda, atbp.

Matapos ang lahat ng mga mapagkukunan ay matagumpay na nalikha sa pamamagitan ng Terraform code, kailangan mong pumunta sa mga detalye ng Kinesis Analytics application (sa kasamaang-palad, hindi ko nakita kung paano ito gawin nang direkta mula sa code).

Ilunsad ang application:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Pagkatapos nito, dapat mong tahasang itakda ang pangalan ng in-application na stream sa pamamagitan ng pagpili mula sa drop-down na listahan:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Ngayon ay handa na ang lahat.

Pagsubok sa aplikasyon

Hindi alintana kung paano mo na-deploy ang system, nang manu-mano o sa pamamagitan ng Terraform code, gagana rin ito.

Nag-log in kami sa pamamagitan ng SSH sa EC2 virtual machine kung saan naka-install ang Kinesis Agent at pinapatakbo ang api_caller.py script

sudo ./api_caller.py TOKEN

Ang kailangan mo lang gawin ay maghintay ng SMS sa iyong numero:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
SMS - dumating ang isang mensahe sa telepono sa halos 1 minuto:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server
Ito ay nananatiling upang makita kung ang mga tala ay nai-save sa database ng DynamoDB para sa kasunod, mas detalyadong pagsusuri. Ang talahanayan ng airline_tickets ay naglalaman ng humigit-kumulang sumusunod na data:

Pagsasama ng Aviasales API sa Amazon Kinesis at pagiging simple ng walang server

Konklusyon

Sa kurso ng gawaing ginawa, isang online na sistema ng pagpoproseso ng data ay binuo batay sa Amazon Kinesis. Isinaalang-alang ang mga opsyon para sa paggamit ng Kinesis Agent kasabay ng Kinesis Data Streams at real-time analytics Kinesis Analytics gamit ang mga SQL command, pati na rin ang pakikipag-ugnayan ng Amazon Kinesis sa iba pang mga serbisyo ng AWS.

Na-deploy namin ang system sa itaas sa dalawang paraan: isang medyo mahabang manual at isang mabilis mula sa Terraform code.

Available ang lahat ng source code ng proyekto sa aking GitHub repository, Iminumungkahi kong maging pamilyar ka dito.

Natutuwa akong talakayin ang artikulo, inaasahan ko ang iyong mga komento. Umaasa ako para sa constructive criticism.

Nais kong tagumpay ka!

Pinagmulan: www.habr.com

Magdagdag ng komento