Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

Hlo Habr!

Koj puas nyiam ya dav hlau? Kuv hlub nws, tab sis thaum nyob ib leeg kuv kuj poob rau hauv kev hlub nrog kev txheeb xyuas cov ntaub ntawv ntawm daim pib dav hlau los ntawm ib qho chaw paub zoo - Aviasales.

Niaj hnub no peb yuav txheeb xyuas cov haujlwm ntawm Amazon Kinesis, tsim cov kab ke streaming nrog kev tshuaj xyuas lub sijhawm, nruab Amazon DynamoDB NoSQL database ua cov ntaub ntawv tseem ceeb, thiab teeb tsa SMS ceeb toom rau daim pib nthuav.

Tag nrho cov ntsiab lus yog nyob rau hauv qhov kev txiav! Mus!

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

Taw qhia

Piv txwv li, peb xav tau kev nkag mus rau Aviasale API. Kev nkag mus rau nws yog muab pub dawb thiab tsis muaj kev txwv; koj tsuas yog yuav tsum tau sau npe rau hauv ntu "Developers" kom tau txais koj API token nkag mus rau cov ntaub ntawv.

Lub hom phiaj tseem ceeb ntawm tsab xov xwm no yog muab kev nkag siab dav dav ntawm kev siv cov ntaub ntawv streaming hauv AWS; peb muab rau hauv tus account tias cov ntaub ntawv xa rov qab los ntawm API siv tsis yog nruj me ntsis raws li niaj hnub thiab raug xa los ntawm cache, uas yog tsim los ntawm kev tshawb fawb los ntawm cov neeg siv ntawm Aviasales.ru thiab Jetradar.com qhov chaw rau 48 teev dhau los.

Kinesis-tus neeg sawv cev, ntsia rau ntawm lub tshuab tsim khoom, tau txais los ntawm API yuav cia li cais thiab xa cov ntaub ntawv mus rau qhov xav tau ntawm Kinesis Data Analytics. Cov ntawv nyoos ntawm cov kwj dej no yuav raug sau ncaj qha rau lub khw. Cov ntaub ntawv nyoos khaws cia rau hauv DynamoDB yuav tso cai rau kev txheeb xyuas daim pib sib sib zog nqus los ntawm BI cov cuab yeej, xws li AWS Quick Sight.

Peb yuav txiav txim siab ob qho kev xaiv rau kev xa tawm tag nrho cov infrastructure:

  • Manual - via AWS Management Console;
  • Infrastructure los ntawm Terraform code yog rau tub nkeeg automators;

Architecture ntawm tus tsim system

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Cheebtsam siv:

  • Aviasale API - cov ntaub ntawv xa rov qab los ntawm API no yuav raug siv rau txhua txoj haujlwm tom ntej;
  • EC2 Tus tsim khoom piv txwv - lub tshuab virtual tsis tu ncua hauv huab uas cov ntaub ntawv tawm tswv yim yuav raug tsim tawm:
    • Kinesis Tus neeg saib xyuas yog ib daim ntawv thov Java ntsia hauv zos ntawm lub tshuab uas muab txoj hauv kev yooj yim los sau thiab xa cov ntaub ntawv mus rau Kinesis (Kinesis Data Strems lossis Kinesis Firehose). Tus neeg sawv cev saib xyuas ib txheej ntawm cov ntaub ntawv hauv cov npe teev tseg thiab xa cov ntaub ntawv tshiab rau Kinesis;
    • API Caller Script - Ib tsab ntawv Python uas thov rau API thiab muab cov lus teb rau hauv daim nplaub tshev uas tau saib xyuas los ntawm Kinesis Agent;
  • Kinesis Data Stream - cov ntaub ntawv streaming ntawm lub sijhawm tiag tiag nrog cov peev txheej dav dav;
  • Kinesis Analytics yog qhov kev pabcuam serverless uas yooj yim rau kev txheeb xyuas cov ntaub ntawv streaming hauv lub sijhawm. Amazon Kinesis Cov Ntaub Ntawv Analytics teeb tsa cov ntawv thov kev pab thiab txiav cov nplai los tswj cov ntim ntawm cov ntaub ntawv tuaj;
  • AWS Lambda - ib qho kev pabcuam uas tso cai rau koj khiav cov lej yam tsis muaj thaub qab lossis teeb tsa servers. Tag nrho kev suav lub zog yog txiav txim siab rau txhua qhov hu;
  • Amazon DynamoDB - Cov ntaub ntawv ntawm tus nqi tseem ceeb thiab cov ntaub ntawv uas muab latency tsawg dua 10 milliseconds thaum khiav ntawm txhua qhov ntsuas. Thaum siv DynamoDB, koj tsis tas yuav muab, thaj, lossis tswj cov servers. DynamoDB cia li ntsuas cov ntxhuav los kho tus nqi ntawm cov peev txheej muaj thiab tswj kev ua haujlwm siab. Tsis xav tau kev tswj hwm qhov system;
  • Amazon SNS - kev pabcuam tswj hwm tag nrho rau kev xa cov lus siv tus qauv tshaj tawm-rau npe (Pub/Sub), uas koj tuaj yeem cais microservices, faib tshuab thiab cov ntawv thov tsis muaj server. SNS tuaj yeem siv los xa cov ntaub ntawv rau cov neeg siv kawg los ntawm kev ceeb toom xov tooj ntawm tes, SMS lus thiab emails.

Kev cob qhia pib

Txhawm rau ua raws li cov ntaub ntawv ntws, kuv txiav txim siab siv daim pib dav hlau cov ntaub ntawv xa rov qab los ntawm Aviasales API. IN cov ntaub ntawv heev ib daim ntawv teev cov txheej txheem sib txawv, cia peb coj ib tug ntawm lawv - "Lub hli Nqe Daim Ntawv Qhia", uas rov qab cov nqi rau txhua hnub ntawm lub hli, pab pawg los ntawm tus lej hloov tsheb. Yog tias koj tsis qhia lub hli nrhiav hauv qhov kev thov, cov ntaub ntawv yuav raug xa rov qab rau lub hli tom qab qhov tam sim no.

Yog li, cia peb sau npe thiab tau txais peb cov token.

Ib qho piv txwv thov yog hauv qab no:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Cov txheej txheem saum toj no tau txais cov ntaub ntawv los ntawm API los ntawm kev qhia lub token hauv qhov kev thov yuav ua haujlwm, tab sis kuv xav kom dhau qhov nkag token los ntawm header, yog li peb yuav siv cov qauv no hauv api_caller.py tsab ntawv.

Teb piv txwv:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Qhov piv txwv API teb saum toj no qhia txog daim pib ntawm St. Petersburg mus rau Phuk... Auj, npau suav dab tsi...
Txij li thaum kuv los ntawm Kazan, thiab Phuket tam sim no yog "tsuas yog npau suav", cia peb saib daim pib ntawm St. Petersburg mus rau Kazan.

Nws xav tias koj twb muaj AWS account lawm. Kuv xav tam sim kos tshwj xeeb rau qhov tseeb tias Kinesis thiab xa cov ntawv ceeb toom ntawm SMS tsis suav nrog txhua xyoo. Dawb Tier (dawb siv). Tab sis txawm hais tias qhov no, nrog ob peb las hauv siab, nws yog qhov ua tau los tsim cov txheej txheem npaj thiab ua si nrog nws. Thiab, ntawm chav kawm, tsis txhob hnov ​​​​qab tshem tawm tag nrho cov peev txheej tom qab lawv tsis xav tau lawm.

Hmoov zoo, DynamoDb thiab lambda zog yuav pub dawb rau peb yog tias peb ua tau raws li peb cov kev txwv pub dawb txhua hli. Piv txwv li, rau DynamoDB: 25 GB ntawm kev cia, 25 WCU / RCU thiab 100 lab queries. Thiab ib lab lambda muaj nuj nqi hu rau ib hlis.

Manual system deployment

Teeb tsa Kinesis Data Stream

Cia peb mus rau Kinesis Data Stream kev pabcuam thiab tsim ob lub kwj tshiab, ib qho shard rau txhua tus.

Dab tsi yog shard?
Ib qho shard yog cov ntaub ntawv hloov pauv yooj yim ntawm Amazon Kinesis kwj. Ib ntu muab cov ntaub ntawv nkag mus rau ntawm qhov nrawm ntawm 1 MB / s thiab tso tawm cov ntaub ntawv hloov ntawm qhov nrawm ntawm 2 MB / s. Ib ntu txhawb txog 1000 PUT nkag rau ib ob. Thaum tsim cov ntaub ntawv kwj, koj yuav tsum tau qhia kom meej tus naj npawb ntawm ntu. Piv txwv li, koj tuaj yeem tsim cov ntaub ntawv ntws nrog ob ntu. Cov kwj dej no yuav muab cov ntaub ntawv nkag mus ntawm 2 MB / s thiab tso tawm cov ntaub ntawv hloov chaw ntawm 4 MB / s, txhawb nqa txog 2000 PUT cov ntaub ntawv ib ob.

Qhov ntau shards nyob rau hauv koj kwj, qhov ntau dua nws throughput. Nyob rau hauv txoj cai, qhov no yog li cas ntws yog scaled - los ntawm kev ntxiv shards. Tab sis qhov ntau shards koj muaj, tus nqi siab dua. Txhua shard raug nqi 1,5 xees ib teev thiab ntxiv 1.4 xees rau txhua lab PUT payload units.

Cia peb tsim ib lub kwj tshiab nrog lub npe airline_tickets, 1 shard yuav txaus rau nws:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Tam sim no cia peb tsim lwm txoj xov nrog lub npe tshwj xeeb_stream:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

Tus neeg tsim khoom teeb tsa

Txhawm rau txheeb xyuas txoj haujlwm, nws txaus los siv EC2 piv txwv li tus tsim cov ntaub ntawv. Nws tsis tas yuav yog lub tshuab virtual uas muaj zog, kim heev; qhov chaw t2.micro yuav ua tau zoo.

Cov lus ceeb toom tseem ceeb: piv txwv li, koj yuav tsum siv cov duab - Amazon Linux AMI 2018.03.0, nws muaj tsawg dua kev teeb tsa kom sai sai rau Kinesis Agent.

Mus rau EC2 qhov kev pabcuam, tsim lub tshuab virtual tshiab, xaiv AMI yam xav tau nrog hom t2.micro, uas suav nrog hauv Qib Dawb:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Txhawm rau kom lub tshuab virtual tsim tshiab tuaj yeem cuam tshuam nrog Kinesis kev pabcuam, nws yuav tsum tau muab txoj cai los ua. Txoj hauv kev zoo tshaj los ua qhov no yog muab lub luag haujlwm IAM. Yog li ntawd, ntawm Kauj Ruam 3: Configure Instance Details screen, koj yuav tsum xaiv Tsim lub luag haujlwm IAM tshiab:

Tsim lub luag haujlwm IAM rau EC2
Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Hauv qhov rai uas qhib, xaiv tias peb tab tom tsim lub luag haujlwm tshiab rau EC2 thiab mus rau ntu Kev Tso Cai:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Siv cov piv txwv kev cob qhia, peb tsis tas yuav mus rau hauv txhua qhov tsis sib haum xeeb ntawm kev teeb tsa ntawm cov peev txheej muaj cai, yog li peb yuav xaiv cov cai tswjfwm ua ntej los ntawm Amazon: AmazonKinesisFullAccess thiab CloudWatchFullAccess.

Cia peb muab qee lub npe tseem ceeb rau lub luag haujlwm no, piv txwv li: EC2-KinesisStreams-FullAccess. Cov txiaj ntsig yuav tsum zoo ib yam li pom hauv daim duab hauv qab no:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Tom qab tsim lub luag haujlwm tshiab no, tsis txhob hnov ​​​​qab muab nws tso rau hauv lub tshuab virtual tsim:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Peb tsis hloov lwm yam ntawm qhov screen no thiab txav mus rau lub qhov rais tom ntej.

Lub hard drive chaw tuaj yeem raug tso tseg raws li lub neej ntawd, nrog rau cov cim npe (txawm hais tias nws yog qhov zoo los siv cov cim npe, tsawg kawg yog muab lub npe piv txwv thiab qhia txog ib puag ncig).

Tam sim no peb nyob ntawm Kauj Ruam 6: Configure Security Group tab, qhov twg koj yuav tsum tsim ib qho tshiab lossis qhia koj pab pawg Security uas twb muaj lawm, uas tso cai rau koj txuas ntawm ssh (chaw nres nkoj 22) rau qhov piv txwv. Xaiv Qhov Chaw -> Kuv IP nyob ntawd thiab koj tuaj yeem tso qhov piv txwv.

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Thaum nws hloov mus rau cov xwm txheej khiav, koj tuaj yeem sim txuas rau nws ntawm ssh.

Txhawm rau ua haujlwm nrog Kinesis Agent, tom qab ua tiav txuas rau lub tshuab, koj yuav tsum nkag mus rau cov lus txib hauv qab no hauv lub davhlau ya nyob twg:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Cia peb tsim ib daim nplaub tshev kom txuag tau cov lus teb API:

sudo mkdir /var/log/airline_tickets

Ua ntej pib tus neeg sawv cev, koj yuav tsum teeb tsa nws qhov kev teeb tsa:

sudo vim /etc/aws-kinesis/agent.json

Cov ntsiab lus ntawm cov ntaub ntawv agent.json yuav tsum zoo li no:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Raws li tuaj yeem pom los ntawm cov ntaub ntawv teeb tsa, tus neeg sawv cev yuav saib xyuas cov ntaub ntawv nrog .log txuas ntxiv hauv /var/log/airline_tickets/ directory, txheeb xyuas lawv thiab hloov mus rau airline_tickets kwj.

Peb rov pib qhov kev pab cuam thiab xyuas kom meej tias nws tau nce thiab khiav:

sudo service aws-kinesis-agent restart

Tam sim no cia peb rub tawm Python tsab ntawv uas yuav thov cov ntaub ntawv los ntawm API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Tsab ntawv api_caller.py thov cov ntaub ntawv los ntawm Aviasales thiab khaws cov lus teb tau txais hauv cov npe uas tus neeg sawv cev Kinesis scans. Kev siv ntawm tsab ntawv no yog tus qauv zoo heev, muaj chav kawm TicketsApi, nws tso cai rau koj rub tawm API asynchronously. Peb dhau ib header nrog ib tug token thiab thov tsis rau cov chav kawm no:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Txhawm rau kuaj qhov raug teeb tsa thiab kev ua haujlwm ntawm tus neeg sawv cev, cia peb sim khiav api_caller.py tsab ntawv:

sudo ./api_caller.py TOKEN

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Thiab peb saib cov txiaj ntsig ntawm kev ua haujlwm nyob rau hauv Tus Neeg Saib Xyuas cov ntawv teev lus thiab ntawm Saib Xyuas tab hauv airline_tickets cov ntaub ntawv ntws:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Raws li koj tuaj yeem pom, txhua yam ua haujlwm thiab Kinesis Agent ua tiav xa cov ntaub ntawv mus rau kwj. Tam sim no cia peb teeb tsa cov neeg siv khoom.

Teeb tsa Kinesis Data Analytics

Cia peb txav mus rau qhov tseem ceeb ntawm tag nrho cov kab ke - tsim ib daim ntawv thov tshiab hauv Kinesis Data Analytics lub npe hu ua kinesis_analytics_airlines_app:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Kinesis Cov Ntaub Ntawv Analytics tso cai rau koj los ua cov ntaub ntawv tshawb xyuas lub sijhawm ntawm Kinesis Stream siv cov lus SQL. Nws yog qhov kev pabcuam autoscaling tag nrho (tsis zoo li Kinesis Stream) uas:

  1. tso cai rau koj los tsim cov kwj tshiab (Output Stream) raws li kev thov rau cov ntaub ntawv;
  2. muab cov kwj deg uas tshwm sim thaum cov ntawv thov tau ua haujlwm (Yam kwj);
  3. tuaj yeem txiav txim siab cov tswv yim cov ntaub ntawv tawm tswv yim (nws tuaj yeem hloov kho manually yog tias tsim nyog).

Qhov no tsis yog qhov kev pabcuam pheej yig - 0.11 USD ib teev ntawm kev ua haujlwm, yog li koj yuav tsum ua tib zoo siv nws thiab tshem tawm thaum koj ua tiav.

Cia peb txuas daim ntawv thov mus rau cov ntaub ntawv qhov chaw:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Xaiv cov kwj uas peb yuav txuas mus rau (airline_tickets):

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Tom ntej no, koj yuav tsum tau txuas lub luag haujlwm IAM tshiab kom daim ntawv thov tuaj yeem nyeem los ntawm cov kwj dej thiab sau rau hauv kwj. Txhawm rau ua qhov no, nws txaus kom tsis txhob hloov dab tsi hauv Access permissions block:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Tam sim no cia peb nrhiav pom cov ntaub ntawv schema hauv cov kwj deg; ua qhov no, nyem rau ntawm "Discover schema" khawm. Raws li qhov tshwm sim, lub luag haujlwm IAM yuav raug hloov kho (ib qho tshiab yuav raug tsim) thiab kev tshawb nrhiav schema yuav raug tso tawm los ntawm cov ntaub ntawv uas twb tuaj txog hauv cov kwj deg:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Tam sim no koj yuav tsum mus rau SQL editor. Thaum koj nyem rau ntawm lub pob no, lub qhov rai yuav tshwm sim hais kom koj tso daim ntawv thov - xaiv yam koj xav tso tawm:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Ntxig cov lus nug yooj yim hauv qab no rau hauv SQL editor qhov rai thiab nyem Txuag thiab khiav SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Hauv kev sib raug zoo databases, koj ua hauj lwm nrog cov ntxhuav siv INSERT cov nqe lus ntxiv cov ntaub ntawv thiab SELECT nqe lus nug cov ntaub ntawv. Hauv Amazon Kinesis Data Analytics, koj ua haujlwm nrog cov kwj (kwj) thiab cov twj tso kua mis (PUMPs) - txuas ntxiv thov uas ntxig cov ntaub ntawv los ntawm ib tus kwj hauv ib daim ntawv thov mus rau lwm qhov dej.

Cov lus nug SQL tau nthuav tawm saum toj no tshawb rau Aeroflot daim pib ntawm tus nqi qis dua tsib txhiab rubles. Txhua cov ntaub ntawv uas ua tau raws li cov xwm txheej no yuav muab tso rau hauv DESTINATION_SQL_STREAM kwj.

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Hauv qhov chaw thaiv, xaiv cov kwj tshwj xeeb_stream, thiab hauv daim ntawv thov kwj npe DESTINATION_SQL_STREAM cov npe poob qis:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Qhov tshwm sim ntawm tag nrho cov manipulations yuav tsum yog ib yam dab tsi zoo ib yam li daim duab hauv qab no:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

Tsim thiab sau npe rau lub ncauj lus SNS

Mus rau Qhov Kev Ceeb Toom Yooj Yim thiab tsim cov ncauj lus tshiab muaj nrog lub npe Airlines:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Sau npe rau lub ncauj lus no thiab qhia tus lej xov tooj ntawm tes uas yuav xa SMS ceeb toom:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

Tsim ib lub rooj hauv DynamoDB

Txhawm rau khaws cov ntaub ntawv nyoos los ntawm lawv cov airline_tickets kwj, cia peb tsim ib lub rooj hauv DynamoDB nrog tib lub npe. Peb yuav siv record_id ua tus yuam sij tseem ceeb:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

Tsim lub lambda muaj nuj nqi collector

Cia peb tsim lub luag haujlwm lambda hu ua Tus Sau, uas nws txoj haujlwm yuav yog los xaiv cov airline_tickets kwj thiab, yog tias pom cov ntaub ntawv tshiab muaj, ntxig cov ntaub ntawv no rau hauv DynamoDB rooj. Pom tseeb, ntxiv rau cov cai qub, lambda no yuav tsum tau nyeem nkag mus rau Kinesis cov ntaub ntawv ntws thiab sau nkag mus rau DynamoDB.

Tsim lub luag haujlwm IAM rau lub luag haujlwm lambda
Ua ntej, cia peb tsim lub luag haujlwm IAM tshiab rau lambda npe Lambda-TicketsProcessingRole:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Rau qhov kev xeem piv txwv, qhov pre-configured AmazonKinesisReadOnlyAccess thiab AmazonDynamoDBFullAccess cov cai yog qhov tsim nyog, raws li qhia hauv daim duab hauv qab no:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

Qhov no lambda yuav tsum tau launched los ntawm ib tug tshwm sim los ntawm Kinesis thaum tshiab nkag mus rau hauv lub airline_stream, yog li peb yuav tsum tau ntxiv ib tug tshiab trigger:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Txhua yam uas tseem tshuav yog txhawm rau ntxig tus lej thiab txuag lub lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Tsim lub lambda muaj nuj nqi ceeb toom

Qhov thib ob lambda muaj nuj nqi, uas yuav saib xyuas cov kwj thib ob (special_stream) thiab xa cov ntawv ceeb toom rau SNS, yog tsim nyob rau hauv ib txoj kev zoo sib xws. Yog li ntawd, lub lambda no yuav tsum muaj kev nkag mus nyeem los ntawm Kinesis thiab xa cov lus mus rau lub ntsiab lus SNS, uas yuav raug xa los ntawm SNS kev pabcuam rau txhua tus neeg siv ntawm lub ncauj lus no (email, SMS, thiab lwm yam).

Tsim lub luag haujlwm IAM
Ua ntej, peb tsim lub luag haujlwm IAM Lambda-KinesisAlarm rau lub lambda no, thiab tom qab ntawd muab lub luag haujlwm no rau lub alarm_notifier lambda tau tsim:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

Qhov no lambda yuav tsum ua hauj lwm ntawm ib tug trigger rau cov ntaub ntawv tshiab nkag mus rau tshwj xeeb_stream, yog li koj yuav tsum tau configure lub trigger nyob rau hauv tib txoj kev raws li peb tau ua rau lub Collector lambda.

Txhawm rau ua kom yooj yim rau kev teeb tsa lub lambda no, cia peb qhia qhov hloov pauv ib puag ncig tshiab - TOPIC_ARN, qhov twg peb tso ANR (Amazon Recourse Names) ntawm lub ntsiab lus ntawm lub tuam txhab:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Thiab ntxig rau lambda code, nws tsis nyuaj txhua:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Nws zoo nkaus li qhov no yog qhov uas phau ntawv qhia kev teeb tsa tiav. Txhua yam uas tseem tshuav yog sim thiab xyuas kom meej tias peb tau teeb tsa txhua yam kom raug.

Deploy los ntawm Terraform code

Yuav tsum tau npaj

Terraform yog ib qho yooj yim qhib-qhov cuab yeej rau deploying infrastructure los ntawm code. Nws muaj nws tus kheej syntax uas yooj yim kawm thiab muaj ntau yam piv txwv ntawm yuav ua li cas thiab siv dab tsi. Atom editor lossis Visual Studio Code muaj ntau cov plugins uas ua haujlwm nrog Terraform yooj yim dua.

Koj tuaj yeem rub tawm qhov kev faib tawm ntawm no. Kev soj ntsuam ntxaws ntxaws ntawm txhua lub peev xwm Terraform yog dhau ntawm cov kab lus no, yog li peb yuav txwv peb tus kheej rau cov ntsiab lus tseem ceeb.

Yuav pib li cas

Tag nrho cov cai ntawm qhov project yog hauv kuv qhov chaw khaws cia. Peb clone lub repository rau peb tus kheej. Ua ntej pib, koj yuav tsum paub tseeb tias koj muaj AWS CLI ntsia thiab teeb tsa, vim ... Terraform yuav nrhiav cov ntaub ntawv pov thawj hauv cov ntaub ntawv ~/.aws/credentials.

Ib qho kev xyaum zoo yog los khiav cov phiaj xwm hais kom ua ua ntej siv tag nrho cov txheej txheem kom pom dab tsi Terraform tab tom tsim rau peb hauv huab:

terraform.exe plan

Koj yuav raug ceeb toom kom nkag mus rau tus lej xov tooj kom xa cov ntawv ceeb toom rau. Nws tsis yog yuav tsum tau nkag mus rau hauv theem no.

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Tom qab txheeb xyuas qhov kev npaj ua haujlwm, peb tuaj yeem pib tsim cov peev txheej:

terraform.exe apply

Tom qab xa cov lus txib no, koj yuav rov hais dua kom nkag mus rau tus lej xov tooj; hu rau "yog" thaum muaj lus nug txog kev ua haujlwm tau tshwm sim. Qhov no yuav tso cai rau koj los teeb tsa tag nrho cov txheej txheem, ua tiav txhua qhov tsim nyog ntawm EC2, xa cov haujlwm lambda, thiab lwm yam.

Tom qab tag nrho cov peev txheej tau ua tiav tiav los ntawm Terraform code, koj yuav tsum nkag mus rau cov ntsiab lus ntawm Kinesis Analytics daim ntawv thov (hmoov tsis tau, kuv tsis pom yuav ua li cas ncaj qha los ntawm cov cai).

Tua tawm daim ntawv thov:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Tom qab ntawd, koj yuav tsum qhia meej meej lub npe hauv daim ntawv thov kwj los ntawm kev xaiv los ntawm cov npe teev tseg:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Tam sim no txhua yam yog npaj mus.

Kuaj daim ntawv thov

Txawm hais tias koj siv lub system li cas, manually lossis los ntawm Terraform code, nws yuav ua haujlwm zoo ib yam.

Peb nkag los ntawm SSH mus rau EC2 lub tshuab virtual qhov twg Kinesis Agent raug teeb tsa thiab khiav api_caller.py tsab ntawv

sudo ./api_caller.py TOKEN

Txhua yam koj yuav tsum ua yog tos SMS rau koj tus lej:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
SMS - cov lus tuaj txog hauv xov tooj yuav luag 1 feeb:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity
Nws tseem yuav pom tias cov ntaub ntawv khaws tseg hauv DynamoDB database rau tom ntej, cov ncauj lus kom ntxaws ntxiv. Lub rooj airline_tickets muaj kwv yees li cov ntaub ntawv hauv qab no:

Aviasales API kev koom ua ke nrog Amazon Kinesis thiab serverless simplicity

xaus

Hauv chav kawm ntawm kev ua haujlwm tiav, ib qho kev ua cov ntaub ntawv online tau tsim los ntawm Amazon Kinesis. Cov kev xaiv rau kev siv Kinesis Agent ua ke nrog Kinesis Data Stream thiab kev tshawb xyuas lub sijhawm Kinesis Analytics siv SQL cov lus txib, nrog rau kev sib cuam tshuam ntawm Amazon Kinesis nrog lwm cov kev pabcuam AWS raug txiav txim siab.

Peb xa cov txheej txheem saum toj no hauv ob txoj hauv kev: phau ntawv ntev ntev ib qho thiab ib qho ceev los ntawm Terraform code.

Txhua qhov project source code yog muaj hauv kuv GitHub repository, Kuv xav kom koj paub koj tus kheej nrog nws.

Kuv zoo siab los tham txog tsab xov xwm, Kuv tos ntsoov rau koj cov lus. Kuv vam tias yuav muaj kev thuam.

Kuv thov kom koj ua tiav!

Tau qhov twg los: www.hab.com

Ntxiv ib saib