Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

Ei Habr!

E te fiafia i vaalele lele? Ou te fiafia i ai, ae i le taimi na tuʻuʻesea ai na ou fiafia foʻi i le suʻesuʻeina o faʻamaumauga i tiketi vaalele mai se tasi lauiloa punaoa - Aviasales.

O aso nei o le a tatou iloiloina le galuega a Amazon Kinesis, fausia se faiga faʻafefe faʻatasi ai ma auʻiliʻiliga taimi moni, faʻapipiʻi le Amazon DynamoDB NoSQL database e avea ma faʻamaumauga autu o faʻamaumauga, ma faʻapipiʻi faʻamatalaga SMS mo tiketi fiafia.

O faʻamatalaga uma o loʻo i lalo o le tipi! Alu!

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

Faatomuaga

Mo se faʻataʻitaʻiga, matou te manaʻomia le avanoa i Aviasales API. Avanoa i ai e tuʻuina atu e aunoa ma se totogi ma e aunoa ma se faʻatapulaʻaina; naʻo lou manaʻomia e lesitala i le vaega "Developers" e maua ai lau faʻailoga API e maua ai faʻamatalaga.

O le autu autu o lenei tusiga o le tuʻuina atu lea o se malamalamaʻaga lautele i le faʻaogaina o faʻamatalaga faʻasalalau i le AWS; matou te manatu o faʻamatalaga na toe faʻafoʻi mai e le API faʻaaogaina e le o matua faʻaonaponei ma o loʻo tuʻuina atu mai le cache, lea e fa'avae i luga o su'esu'ega a tagata fa'aoga o nofoaga Aviasales.ru ma Jetradar.com mo le 48 itula mulimuli.

Kinesis-agent, faʻapipiʻi i luga o le masini gaosia, mauaina e ala i le API o le a otometi lava ona faʻasalalau ma tuʻuina atu faʻamatalaga i le vaitafe manaʻomia e ala i Kinesis Data Analytics. O le fa'amatalaga mata'utia o lenei vaitafe o le a tusi sa'o i le faleoloa. O le teuina o faʻamatalaga mataʻutia o loʻo faʻapipiʻiina i DynamoDB o le a faʻatagaina ai le loloto o suʻesuʻega tiketi e ala i meafaigaluega BI, pei ole AWS Quick Sight.

O le a matou iloiloina ni filifiliga se lua mo le faʻapipiʻiina o atinaʻe atoa:

  • Tusi Taiala - e ala i le AWS Management Console;
  • O mea tetele mai le Terraform code e mo masini paie;

Fa'ata'ita'iga o faiga fa'avae

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Vaega fa'aaogaina:

  • Aviasales API - o faʻamatalaga na toe faʻafoʻi mai e lenei API o le a faʻaaogaina mo galuega uma mulimuli ane;
  • EC2 Fa'ata'ita'iga - o se masini komepiuta masani i le ao lea o le a faʻatupuina ai le faʻaogaina o faʻamaumauga:
    • Kinesis Agent o se Java talosaga faʻapipiʻi i le lotoifale i luga o le masini e maua ai se auala faigofie e aoina ma auina atu faʻamatalaga i Kinesis (Kinesis Data Streams poʻo Kinesis Firehose). E mataʻituina pea e le sooupu se seti o faila i totonu o faʻamaumauga faʻapitoa ma tuʻuina atu faʻamatalaga fou i Kinesis;
    • API Caller Script - O se tusitusiga Python e faia talosaga i le API ma tuʻu le tali i totonu o se pusa e mataʻituina e le Kinesis Agent;
  • Kinesis Data Streams - 'au'aunaga fa'asalalau fa'amaumauga fa'a-taimi fa'atasi ma le tele o le fa'alauteleina o le gafatia;
  • Kinesis Analytics o se 'au'aunaga e leai se 'au'aunaga e fa'afaigofie ai le su'esu'eina o fa'amaumauga fa'asalalau i le taimi moni. Amazon Kinesis Data Analytics faʻapipiʻi punaoa faʻaoga ma otometi fua e faʻatautaia soʻo se voluma o faʻamatalaga o loʻo oʻo mai;
  • AWS Lambda - o se 'au'aunaga e fa'atagaina oe e fa'atautaia le code e aunoa ma le fa'aola pe fa'atūina ni 'au'aunaga. O malosiaga fa'akomepiuta uma e otometi lava ona fuaina mo valaau ta'itasi;
  • Amazon DynamoDB - Se fa'amaumauga o pa'aga autu-taua ma pepa e tu'u ai le leo i lalo ifo o le 10 milliseconds pe a tamo'e i so'o se fua. A faʻaaoga DynamoDB, e te le manaʻomia le tuʻuina atu, faʻapipiʻi, pe pulea soʻo se sapalai. E otometi lava ona fua e le DynamoDB laulau e fetuutuunai ai le aofaʻi o punaoa o loʻo avanoa ma faʻatumauina le maualuga o le faʻatinoga. Leai se pulega faiga e mana'omia;
  • Amazon SNS - o se 'au'aunaga fa'afoe atoatoa mo le lafoina o fe'au e fa'aaoga ai le fa'ata'ita'iga a le tagata fa'asalalau-subscriber (Pub/Sub), lea e mafai ai ona e fa'ate'aina microservices, fa'asoa fa'apipi'i ma talosaga e leai se server. E mafai ona fa'aoga le SNS e lafo atu ai fa'amatalaga i tagata fa'au'uga e ala i fa'amatalaga tulei feavea'i, fe'au SMS ma imeli.

Toleniga amata

Ina ia faʻataʻitaʻiina le tafe o faʻamatalaga, na ou filifili ai e faʻaoga le faʻamatalaga o pepa vaalele na toe faʻafoʻi mai e le Aviasales API. IN fa'amaumauga o se lisi tele o metotia eseese, seʻi o tatou ave se tasi oi latou - "Kalena Tau Masina", lea e toe faʻafoʻi tau mo aso taʻitasi o le masina, faʻavasegaina i le numera o fesiitaiga. Afai e te le faʻamaonia le masina suʻesuʻe i le talosaga, o le a toe faʻafoʻi faʻamatalaga mo le masina e sosoo ma le masina o loʻo iai nei.

O lea, tatou lesitala ma maua a tatou faʻailoga.

O se fa'ata'ita'iga talosaga o lo'o i lalo:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

O le auala o loʻo i luga o le mauaina o faʻamatalaga mai le API e ala i le faʻamaonia o se faʻailoga i le talosaga o le a aoga, ae ou te manaʻo e pasi le faʻailoga avanoa e ala i le ulutala, o lea o le a matou faʻaogaina lenei metotia i le api_caller.py script.

Fa'ata'ita'iga tali:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

O le faʻataʻitaʻiga API tali o loʻo i luga o loʻo faʻaalia ai se tiketi mai St. Petersburg i Phuk ... Oi, o se miti ...
Talu ai o aʻu mai Kazan, ma o Phuket ua "na o se miti", seʻi o tatou suʻe tiketi mai St. Petersburg i Kazan.

E fa'apea ua iai sau fa'amatalaga AWS. Ou te manaʻo e vave faʻalogo faapitoa i le mea moni o Kinesis ma le lafoina o faʻamatalaga e ala i SMS e le o aofia ai i le tausaga Tulaga fua (fa'aoga fua). Ae e ui lava i lea, faatasi ai ma ni nai tala i le mafaufau, e matua mafai lava ona fausia le faiga fuafuaina ma taaalo ai. Ma, ioe, aua nei galo e tape uma punaoa pe a le toe manaʻomia.

O le mea e lelei ai, o galuega a DynamoDb ma lambda o le a leai se totogi mo i matou pe a matou ausia a matou tapulaʻa saoloto i masina taʻitasi. Mo se faʻataʻitaʻiga, mo DynamoDB: 25 GB o le teuina, 25 WCU/RCU ma 100 miliona fesili. Ma e miliona lambda galuega e valaau i le masina.

Fa'apipi'iina faiga fa'alemanu

Faʻatulagaina o Kinesis Data Streams

Tatou alu i le auaunaga Kinesis Data Streams ma faia ni vaitafe fou se lua, tasi le shard mo ta'itasi.

O le a le fasipepa?
O le shard o le iunite faʻaliliuina faʻamaumauga autu o se vaitafe Amazon Kinesis. E tasi le vaega e tu'uina atu ai le fa'aliliuina o fa'amatalaga i totonu i le saoasaoa o le 1 MB/s ma le fa'aliliuina o fa'amatalaga o galuega i le saoasaoa o le 2 MB/s. E tasi le vaega e lagolagoina le 1000 PUT fa'amaumauga i le sekone. Pe a fatuina se faʻasalalauga faʻamaumauga, e tatau ona e faʻamaonia le numera manaʻomia o vaega. Mo se faʻataʻitaʻiga, e mafai ona e fatuina se faʻasalalauga faʻamaumauga ma ni vaega se lua. O lenei fa'amaumauga o le a tu'uina atu ai fa'amatalaga fa'amatalaga i totonu i le 2 MB/s ma le fa'aliliuina o fa'amaumauga i le 4 MB/s, lagolago i le 2000 PUT fa'amaumauga i le sekone.

O le tele o shards i lau vaitafe, o le tele foi lea o lona gaosiga. I le fa'avae, o le auala lea e fa'afuaina ai tafega - e ala i le fa'aopoopoina o shards. Ae o le tele o lau paʻu, o le maualuga lea o le tau. E 1,5 sene i le itula le tau o fasimea ta'itasi ma le 1.4 sene fa'aopoopo mo ta'i miliona iunite PUT uta.

Se'i tatou faia se vaitafe fou ma le igoa vaalele_tiketi, o le a lava le fasi 1 mo ia:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Sei o tatou faia se isi filo ma le igoa special_stream:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

Fa'atonuga

Ina ia faʻavasega se galuega, ua lava le faʻaaogaina o se faʻataʻitaʻiga masani EC2 e fai ma faʻamaumauga. E le tatau ona avea o se masini mamana mamana taugata; o se nofoaga t2.micro o le a lelei.

Faʻamatalaga taua: mo se faʻataʻitaʻiga, e tatau ona e faʻaogaina ata - Amazon Linux AMI 2018.03.0, e itiiti ni faʻatulagaga mo le vave faʻalauiloaina o le Kinesis Agent.

Alu i le auaunaga EC2, fatuina se masini komepiuta fou, filifili le AMI manaʻomia ma le ituaiga t2.micro, lea e aofia i le Free Tier:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Ina ia mafai ona fegalegaleai le masini komepiuta fou faatoa faia ma le auaunaga Kinesis, e tatau ona tuʻuina atu aia tatau e fai ai. O le auala sili e fai ai lenei mea o le tuʻuina atu lea o se matafaioi IAM. O le mea lea, i luga o le Laasaga 3: Faʻatonu Faʻamatalaga Faʻamatalaga mata, e tatau ona e filifilia Fausia se matafaioi IAM fou:

Fausia se matafaioi IAM mo EC2
Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
I le faʻamalama e matala, filifili o loʻo matou fatuina se matafaioi fou mo EC2 ma alu i le vaega Faʻatagaga:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
I le faʻaaogaina o le faʻataʻitaʻiga faʻataʻitaʻiga, e le tatau ona tatou oʻo atu i faʻalavelave uma o le faʻavasegaina o aia tatau o punaoa, o lea o le a tatou filifilia ai faiga faʻavae na muai faʻatulagaina e Amazon: AmazonKinesisFullAccess ma CloudWatchFullAccess.

Sei o tatou tuuina atu se igoa anoa mo lenei matafaioi, mo se faataitaiga: EC2-KinesisStreams-FullAccess. O le taunuuga e tatau ona tutusa e pei ona faʻaalia i le ata o loʻo i lalo:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
A maeʻa ona fatuina lenei matafaioi fou, aua neʻi galo e faʻapipiʻi i le masini masini masini na faia:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Matou te le suia se isi mea i luga o lenei lau ma agai atu i le isi windows.

E mafai ona tu'u le fa'aogaina o le ta'avale malo e le mafai, fa'apea fo'i ma pine (e ui o se faiga lelei le fa'aogaina o pine, ia tu'u atu i le fa'ata'ita'iga se igoa ma fa'ailoa le si'osi'omaga).

O lea ua matou i ai i luga o le Laasaga 6: Faʻapipiʻi le Vaega o Puipuiga, lea e te manaʻomia ai le fatuina o se mea fou pe faʻamaonia lau vaega saogalemu o iai, lea e mafai ai ona e faʻafesoʻotaʻi e ala i le ssh (taulaga 22) i le faʻataʻitaʻiga. Filifili Puna -> O laʻu IP iina ma e mafai ona e faʻalauiloa le faʻataʻitaʻiga.

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
O le taimi lava e sui ai i le tulaga tamoe, e mafai ona e taumafai e faʻafesoʻotaʻi i ai e ala i le ssh.

Ina ia mafai ona galue ma Kinesis Agent, pe a uma ona faʻafesoʻotaʻi lelei i le masini, e tatau ona e ulufale i tulafono nei i le laina:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Sei o tatou faia se faila e teu ai tali API:

sudo mkdir /var/log/airline_tickets

Aʻo leʻi amataina le sooupu, e tatau ona e faʻapipiʻi lona config:

sudo vim /etc/aws-kinesis/agent.json

O mea o loʻo i totonu o le faila agent.json e tatau ona faʻapea:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

E pei ona mafai ona vaʻaia mai le faila faʻapipiʻi, o le a mataʻituina e le sooupu faila i le .log faʻaopoopoga i le /var/log/airline_tickets/ directory, faʻasalalau ma faʻafeiloaʻi i latou i le airline_tickets stream.

Matou te toe amataina le auaunaga ma ia mautinoa o loʻo i luga ma faʻagaoioia:

sudo service aws-kinesis-agent restart

Sei o tatou sii maia le Python script o le a talosagaina faʻamatalaga mai le API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

O le api_caller.py script e talosagaina faʻamatalaga mai Aviasales ma faʻasaoina le tali na maua i le lisi o loʻo suʻesuʻeina e le Kinesis agent. O le faʻatinoga o lenei tusitusiga e fai si tulaga masani, o loʻo i ai se vasega TicketsApi, e mafai ai ona e toso faʻatasi le API. Matou te pasi atu se ulutala ma se faʻailoga ma talosagaina faʻamaufaʻailoga i lenei vasega:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Ina ia faʻataʻitaʻiina tulaga saʻo ma galuega a le sooupu, seʻi o tatou suʻeina le api_caller.py script:

sudo ./api_caller.py TOKEN

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Ma matou vaʻavaʻai i le taunuʻuga o le galuega i totonu o faʻamaumauga a le Agent ma luga o le Mataʻituina faʻamau i le vaalele_tickets faʻamaumauga:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
E pei ona mafai ona e vaʻai, o mea uma e galue ma o le Kinesis Agent e faʻafeiloaʻi manuia faʻamatalaga i le vaitafe. Se'i o tatou fetu'una'i tagata fa'atau.

Faʻatulagaina o Kinesis Data Analytics

Sei o tatou agai atu i le vaega tutotonu o le faiga atoa - fai se talosaga fou ile Kinesis Data Analytics e igoa kinesis_analytics_airlines_app:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Kinesis Data Analytics e faʻatagaina oe e faʻatino faʻamatalaga faʻamatalaga taimi moni mai Kinesis Streams e faʻaaoga ai le gagana SQL. O se 'au'aunaga fa'atosina atoatoa (e le pei o Kinesis Streams) e:

  1. fa'atagaina oe e faia ni vaitafe fou (Output Stream) e fa'avae i luga o talosaga e maua ai fa'amaumauga;
  2. e maua ai se vaitafe ma mea sese na tutupu a'o fa'agasolo talosaga (Error Stream);
  3. e mafai ona otometi ona fuafua le faʻaogaina o faʻamatalaga faʻamatalaga (e mafai ona toe faʻamalamalamaina ma le lima pe a manaʻomia).

E le o se auaunaga taugofie - 0.11 USD i le itula o galuega, o lea e tatau ona e faʻaogaina ma le faʻaeteete ma tape pe a maeʻa.

Sei o tatou fa'afeso'ota'i le talosaga i le puna o fa'amaumauga:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Filifili le vaitafe o le a matou fa'afeso'ota'i i ai (airline_tickets):

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Le isi, e tatau ona e faʻapipiʻi se IAM Role fou ina ia mafai e le talosaga ona faitau mai le vaitafe ma tusi i le vaitafe. Ina ia faia lenei mea, ua lava le le suia o soʻo se mea i totonu o le Access permissions block:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Sei o tatou talosagaina le mauaina o faʻamaumauga faʻamaumauga i totonu o le vaitafe; e fai lenei mea, kiliki i le "Discover schema" button. O le iʻuga, o le a faʻafouina le matafaioi a le IAM (o le a faia se mea fou) ma o le a faʻalauiloaina le faʻataʻitaʻiga mai faʻamatalaga ua uma ona taunuu i le vaitafe:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Ole taimi nei e tatau ona e alu i le SQL editor. A e kiliki i luga o lenei faʻamau, o le a aliali mai se faʻamalama e fai atu ia te oe e faʻalauiloa le talosaga - filifili le mea e te manaʻo e faʻalauiloa:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Fa'aofi le fesili faigofie lea i totonu o le fa'amalama fa'atonu SQL ma kiliki Save ma Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

I feso'ota'iga fa'amaumauga, e te galue ma laulau e fa'aaoga ai fa'amatalaga INSERT e fa'aopoopo ai fa'amaumauga ma se fa'amatalaga SELECT e su'e ai fa'amatalaga. I le Amazon Kinesis Data Analytics, e te galue faʻatasi ma vaitafe (STREAMs) ma pamu (PUMPs) - faʻaauau faʻapipiʻi talosaga e faʻapipiʻi faʻamatalaga mai le tasi vaitafe i se talosaga i se isi vaitafe.

Ole fesili ole SQL o loʻo tuʻuina atu i luga ole suʻesuʻega mo tiketi Aeroflot ile tau i lalo ole lima afe rubles. O fa'amaumauga uma e fetaui ma ia aiaiga o le a tu'u i le DESTINATION_SQL_STREAM stream.

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
I le poloka o le Fa'asinomaga, filifili le alavai fa'apitoa, ma ile igoa ole fa'aoga ole igoa DESTINATION_SQL_STREAM lisi fa'alalo:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
O le taunuuga o togafiti uma e tatau ona tutusa ma le ata o loʻo i lalo:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

Fausia ma lesitala i se autu SNS

Alu i le Simple Notification Service ma fai se autu fou iina ma le igoa Airlines:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Faʻasoa i lenei autu ma faʻaalia le numera telefoni feaveaʻi o le a lafo i ai faʻamatalaga SMS:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

Fausia se laulau i DynamoDB

Ina ia teuina faʻamatalaga mataʻutia mai le latou airline_tickets stream, sei fai se laulau i DynamoDB ma le igoa tutusa. Matou te faʻaaogaina record_id e fai ma ki autu:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

Fausia se lambda galuega aoina

Se'i o tatou faia se galuega lambda e ta'ua o le Collector, o lana galuega o le su'esu'eina lea o le alaga o vaalele_tickets ma, afai e maua ni faamaumauga fou iina, fa'aofi nei faamaumauga i le laulau DynamoDB. E manino lava, i le faʻaopoopoga i aia tatau, o lenei lambda e tatau ona i ai le avanoa faitau i le Kinesis data stream ma tusi avanoa i DynamoDB.

Fausia se matafaioi IAM mo le galuega lambda aoina
Muamua, se'i o tatou faia se matafaioi IAM fou mo le lambda e igoa ia Lambda-TicketsProcessingRole:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Mo le faʻataʻitaʻiga faʻataʻitaʻiga, o le AmazonKinesisReadOnlyAccess ma AmazonDynamoDBFullAccess faiga faʻavae e fetaui lelei, e pei ona faʻaalia i le ata o loʻo i lalo:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

O lenei lambda e tatau ona faʻalauiloa e se faʻaoso mai Kinesis pe a oʻo mai faʻamatalaga fou i le airline_stream, o lea e manaʻomia ai ona matou faʻaopoopoina se mea fou:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Pau lava le mea o loʻo totoe o le tuʻuina lea o le code ma faʻasaoina le lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Fausia se fa'amatalaga galuega lambda

O le galuega lambda lona lua, lea o le a mataʻituina le vaitafe lona lua (special_stream) ma auina atu se faʻamatalaga i le SNS, e faia i se auala talitutusa. O le mea lea, o lenei lambda e tatau ona maua le avanoa e faitau mai ai Kinesis ma auina atu feʻau i se autu SNS tuʻuina atu, lea o le a tuʻuina atu e le SNS auaunaga i tagata uma o loʻo lesitala i lenei autu (imeli, SMS, ma isi).

Fausia se matafaioi IAM
Muamua, matou te fatuina le matafaioi IAM Lambda-KinesisAlarm mo lenei lambda, ona tuʻuina atu lea o lenei matafaioi i le alarm_notifier lambda o loʻo faia:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

O lenei lambda e tatau ona galue i luga o se faʻaoso mo faamaumauga fou e ulufale ai i le special_stream, o lea e tatau ai ona e faʻatulagaina le faʻaoso i le auala tutusa e pei ona matou faia mo le Collector lambda.

Ina ia faafaigofieina le fetuutuunai o lenei lambda, seʻi o tatou faʻalauiloa se suiga fou o le siosiomaga - TOPIC_ARN, lea tatou te tuʻuina ai le ANR (Amazon Recourse Names) o le autu o le kamupani vaalele:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Ma faʻaofi le lambda code, e le faigata lava:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

E foliga mai o le mea lea ua maeʻa ai le faʻatulagaina o le faʻaogaina o le tusi lesona. Pau lava le mea o loʻo totoe o le suʻega ma faʻamautinoa ua matou faʻatulagaina mea uma saʻo.

Fa'asolo mai le code Terraform

Sauniuniga tatau

Terraform o se meafaigaluega tatala-puna faigofie tele mo le faʻapipiʻiina o atinaʻe mai le code. E iai lana fa'asologa e faigofie ona aʻoaʻoina ma e tele faʻataʻitaʻiga o le auala ma le mea e faʻapipiʻi. O le Atom editor poʻo le Visual Studio Code o loʻo i ai le tele o faʻaoga faʻaoga e faʻafaigofie ai le galulue ma Terraform.

E mafai ona e sii maia le tufatufaina mai iinei. O se auiliiliga auiliili o gafatia uma o Terraform e sili atu i le lautele o lenei tusiga, o lea o le a tatou faʻatapulaʻaina i tatou lava i manatu autu.

Auala e tamoe ai

Ole tulafono atoa ole poloketi ole i totonu o laʻu fale teu oloa. Matou te faʻapipiʻi le fale teu oloa ia i matou lava. Aʻo leʻi amataina, e tatau ona e mautinoa o loʻo e faʻapipiʻi ma faʻapipiʻi le AWS CLI, aua ... Terraform o le a suʻeina faʻamatalaga i le ~/.aws/credentials faila.

O se faʻataʻitaʻiga lelei o le faʻatautaia lea o le faʻatonuga o le fuafuaga aʻo leʻi faʻapipiʻiina le atinaʻe atoa e vaʻai i mea o loʻo faia nei e Terraform mo i tatou i le ao:

terraform.exe plan

O le a uunaia oe e ulufale i se numera telefoni e lafo atu ai fa'amatalaga. E le tatau ona ulufale i ai i lenei laasaga.

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
I le faʻavasegaina o le fuafuaga faʻatino a le polokalame, e mafai ona tatou amata fatuina punaoa:

terraform.exe apply

A maeʻa ona tuʻuina atu lenei faʻatonuga, o le a toe talosagaina oe e ulufale i se numera telefoni; vili le "ioe" pe a faʻaalia se fesili e uiga i le faia moni o gaioiga. O lenei mea o le a faʻatagaina ai oe e faʻatutuina uma atinaʻe, faʻatino uma faʻatulagaga talafeagai o le EC2, faʻapipiʻi galuega lambda, ma isi.

A maeʻa ona faʻaleleia uma punaoa e ala i le Terraform code, e tatau ona e alu i faʻamatalaga o le Kinesis Analytics application (faʻanoanoa, ou te leʻi mauaina pe faʻapefea ona faia saʻo mai le code).

Tatala le talosaga:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
A maeʻa lenei mea, e tatau ona e setiina manino le igoa ole faʻaoga i le filifilia mai le lisi pa'ū-lalo:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
O lea ua sauni mea uma e alu.

Su'ega le talosaga

Po o le a lava le auala na e faʻaogaina ai le faiga, ma le lima poʻo le code Terraform, e tutusa lava le aoga.

Matou te ulufale i totonu e ala i le SSH i le masini komepiuta EC2 lea e faʻapipiʻi ai le Kinesis Agent ma faʻatautaia le api_caller.py script

sudo ./api_caller.py TOKEN

Pau lava le mea e tatau ona e faia o le faʻatali mo se SMS i lau numera:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
SMS - o le feʻau e oʻo mai i luga o le telefoni ile toetoe lava 1 minute:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga
O lo'o tumau pea le va'ai pe na fa'asaoina fa'amaumauga i le fa'amaumauga a le DynamoDB mo le su'esu'ega mulimuli ane, sili atu ona au'ili'ili. O le lisi o vaalele_tickets o lo'o iai fa'amatalaga nei:

Aviasales API tu'ufa'atasi ma Amazon Kinesis ma fa'afaigofie e leai se 'au'aunaga

iʻuga

I le gasologa o le galuega na faia, na fausia ai se faiga faʻamaumauga i luga ole laiga e faʻavae ile Amazon Kinesis. Filifiliga mo le faʻaaogaina o le Kinesis Agent faʻatasi ma Kinesis Data Streams ma faʻamatalaga taimi moni Kinesis Analytics faʻaaogaina tulafono SQL, faʻapea foʻi ma fegalegaleaiga a Amazon Kinesis ma isi auaunaga AWS na iloiloina.

Na matou faʻaogaina le faiga o loʻo i luga i ni auala se lua: o se tusi lesona umi ma se vave mai le Terraform code.

O lo'o maua uma le code source code i laʻu fale teu oloa GitHub, Ou te fautua atu ia e faamasani i ai.

Ou te fiafia e talanoaina le tusiga, ou te tulimatai atu i au faʻamatalaga. Ou te faamoemoe mo ni faitioga aoga.

Ou te moomoo ia manuia oe!

puna: www.habr.com

Faaopoopo i ai se faamatalaga