Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

E Habr!

Makemake ʻoe i nā mokulele lele? Aloha au iā ia, akā i ka wā kaʻawale, ua aloha au i ka nānā ʻana i ka ʻikepili i nā tiketi mokulele mai kahi kumu kaulana - Aviasales.

I kēia lā, e kālailai mākou i ka hana a Amazon Kinesis, kūkulu i kahi ʻōnaehana streaming me ka ʻikepili manawa maoli, e hoʻokomo i ka waihona ʻo Amazon DynamoDB NoSQL ma ke ʻano he waihona ʻikepili nui, a hoʻonohonoho i nā leka SMS no nā tiketi hoihoi.

Aia nā kikoʻī a pau ma lalo o ka ʻoki! Hele!

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

Hōʻike

No ka laʻana, pono mākou e komo i API no nā Aviasales. Hāʻawi ʻia ka loaʻa iā ia me ka uku ʻole a me ka ʻole o ka palena; pono ʻoe e hoʻopaʻa inoa ma ka ʻāpana "Developers" e loaʻa i kāu hōʻailona API e komo i ka ʻikepili.

ʻO ke kumu nui o kēia ʻatikala ʻo ia ka hāʻawi ʻana i ka ʻike ākea o ka hoʻohana ʻana i ka ʻike e kahe ana i ka AWS; ke noʻonoʻo nei mākou ʻaʻole pololei loa ka ʻikepili i hoʻihoʻi ʻia e ka API i hoʻohana ʻia a ua hoʻouna ʻia mai ka cache, ʻo ia hoʻi. i hoʻokumu ʻia ma muli o ka huli ʻana e nā mea hoʻohana o nā pūnaewele Aviasales.ru a me Jetradar.com no nā hola 48 i hala.

ʻO Kinesis-agent, i hoʻokomo ʻia ma ka mīkini hana, loaʻa ma o ka API e hoʻokaʻawale a hoʻouna i ka ʻikepili i ke kahawai makemake ʻia ma o Kinesis Data Analytics. E kākau pololei ʻia ka mana o kēia kahawai i ka hale kūʻai. ʻO ka mālama ʻikepili maka i hoʻoili ʻia ma DynamoDB e ʻae i ka loiloi tiketi hohonu ma o nā mea hana BI, e like me AWS Quick Sight.

E noʻonoʻo mākou i ʻelua mau koho no ka hoʻohana ʻana i ka ʻōnaehana holoʻokoʻa:

  • Manual - ma o AWS Management Console;
  • ʻO ke kūkulu ʻana mai ke code Terraform no nā mea ʻenehana palaualelo;

Hoʻolālā o ka ʻōnaehana i kūkulu ʻia

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Nā ʻāpana i hoʻohana ʻia:

  • API no nā Aviasales - e hoʻohana ʻia ka ʻikepili i hoʻihoʻi ʻia e kēia API no nā hana a pau;
  • EC2 Mea Hana Hana - he mīkini virtual maʻamau i ke ao kahi e hoʻopuka ʻia ai ke kahawai ʻikepili komo:
    • Agena Kinesis He polokalamu Java i hoʻokomo ʻia ma ka ʻāina ma ka mīkini e hāʻawi i kahi ala maʻalahi e hōʻiliʻili a hoʻouna i nā ʻikepili iā Kinesis (Kinesis Data Streams a i ʻole Kinesis Firehose). Ke nānā mau nei ka luna i kahi hoʻonohonoho o nā faila i nā papa kuhikuhi i kuhikuhi ʻia a hoʻouna i nā ʻikepili hou iā Kinesis;
    • Palapala Kāhea API - He palapala Python e noi ana i ka API a waiho i ka pane i loko o kahi waihona i nānā ʻia e ka Kinesis Agent;
  • Nā kahawai ʻikepili Kinesis - ka lawelawe hoʻoheheʻe ʻikepili i ka manawa maoli me nā mana scaling ākea;
  • Kinesis Analytics he lawelawe serverless e hoʻomaʻamaʻa i ka nānā ʻana i ka ʻikepili kahe i ka manawa maoli. Hoʻonohonoho ʻo Amazon Kinesis Data Analytics i nā kumuwaiwai noiʻi a hoʻopaʻa maʻalahi e mālama i kekahi nui o ka ʻikepili e hiki mai ana;
  • ʻO AWS Lambda - he lawelawe e hiki ai iā ʻoe ke holo i ke code me ke kākoʻo ʻole a hoʻonohonoho paha i nā kikowaena. Hoʻonui ʻia ka mana helu helu āpau no kēlā me kēia kelepona;
  • Amazon DynamoDB - He waihona o nā hui waiwai kī a me nā palapala e hāʻawi ana i ka latency ma lalo o 10 milliseconds i ka wā e holo ana ma kēlā me kēia pālākiō. Ke hoʻohana nei iā DynamoDB, ʻaʻohe pono e hoʻolako, hoʻopaʻa, a hoʻokele paha i nā kikowaena. Hoʻopololei ʻo DynamoDB i nā papa e hoʻoponopono i ka nui o nā kumuwaiwai i loaʻa a mālama i ka hana kiʻekiʻe. ʻAʻole koi ʻia ka hoʻokele ʻōnaehana;
  • Amazon SNS - he lawelawe i hoʻokele piha ʻia no ka hoʻouna ʻana i nā leka me ka hoʻohana ʻana i ke kumu hoʻohālike o ka mea hoʻopuka-subscriber (Pub/Sub), me ia e hiki ai iā ʻoe ke hoʻokaʻawale i nā microservices, nā ʻōnaehana puʻupuʻu a me nā noi serverless. Hiki ke hoʻohana ʻia ʻo SNS no ka hoʻouna ʻana i ka ʻike i nā mea hoʻohana hope ma o nā leka uila, nā leka SMS a me nā leka uila.

Aʻo mua

No ka hoʻohālikelike ʻana i ka kahe ʻikepili, ua hoʻoholo wau e hoʻohana i ka ʻike tiketi mokulele i hoʻihoʻi ʻia e ka Aviasales API. IN palapala He papa inoa nui o nā ʻano hana like ʻole, e lawe mākou i kekahi o lākou - "Mahina Kūʻai Kūʻai", e hoʻihoʻi i nā kumukūʻai no kēlā me kēia lā o ka mahina, i hui ʻia e ka helu o nā hoʻololi. Inā ʻaʻole ʻoe e kuhikuhi i ka mahina ʻimi ma ka noi, e hoʻihoʻi ʻia ka ʻike no ka mahina ma hope o ka mahina o kēia manawa.

No laila, e kākau inoa a kiʻi i kā mākou hōʻailona.

Aia kekahi laʻana noi ma lalo nei:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

ʻO ke ala ma luna o ka loaʻa ʻana o ka ʻikepili mai ka API ma ke kuhikuhi ʻana i kahi hōʻailona ma ka noi e hana, akā makemake wau e hāʻawi i ka hōʻailona komo ma ke poʻo, no laila e hoʻohana mākou i kēia ʻano i ka palapala api_caller.py.

Laʻana pane:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Hōʻike ka laʻana o ka pane API ma luna nei i kahi tikiki mai St. Petersburg a i Phuk... ʻAe, he moeʻuhane ...
No ka mea, no Kazan wau, a ʻo Phuket i kēia manawa "he moeʻuhane wale nō", e ʻimi kākou i nā tiketi mai St. Petersburg a i Kazan.

Manaʻo ia ua loaʻa iā ʻoe kahi moʻokāki AWS. Makemake wau e huki koke i ka nānā kūikawā i ka ʻoiaʻiʻo ʻo Kinesis a me ka hoʻouna ʻana i nā leka hoʻomaopopo ma o SMS ʻaʻole i hoʻokomo ʻia i ka makahiki ʻĀpana manuahi (hoʻohana manuahi). Akā ʻoiai me kēia, me nā kālā ʻelua i ka noʻonoʻo, hiki ke kūkulu i ka ʻōnaehana i manaʻo ʻia a pāʻani pū me ia. A, ʻoiaʻiʻo, mai poina e holoi i nā kumuwaiwai āpau ma hope o ka pono ʻole.

ʻO ka mea pōmaikaʻi, e noa nā hana DynamoDb a me lambda iā mākou inā mākou e hoʻokō i kā mākou mau palena manuahi o kēlā me kēia mahina. No ka laʻana, no DynamoDB: 25 GB o ka waiho ʻana, 25 WCU/RCU a me 100 miliona mau nīnau. A he miliona lambda hana kelepona i kēlā me kēia mahina.

Hoʻolālā ʻōnaehana manual

Hoʻonohonoho i nā kahawai ʻikepili Kinesis

E hele kāua i ka lawelawe Kinesis Data Streams a hana i ʻelua kahawai hou, hoʻokahi shard no kēlā me kēia.

He aha ka ʻāpana?
ʻO kahi shard ka ʻāpana hoʻoili ʻikepili kumu o kahi kahawai Amazon Kinesis. Hoʻokahi māhele e hāʻawi i ka hoʻoili ʻikepili hoʻokomo i ka wikiwiki o 1 MB/s a me ka hoʻoili ʻikepili puka ma ka wikiwiki o 2 MB/s. Kākoʻo kekahi ʻāpana a hiki i 1000 mau helu PUT i kēlā me kēia kekona. I ka hana ʻana i kahi kahawai data, pono ʻoe e kuhikuhi i ka nui o nā ʻāpana i makemake ʻia. No ka laʻana, hiki iā ʻoe ke hana i kahi kahawai data me nā ʻāpana ʻelua. Hāʻawi kēia kahawai ʻikepili i ka hoʻoili ʻana i ka ʻikepili hoʻokomo ma 2 MB/s a me ka hoʻoili ʻana i ka ʻikepili puka ma 4 MB/s, e kākoʻo ana i nā moʻolelo PUT 2000 i kēlā me kēia kekona.

ʻOi aku ka nui o nā shards i kāu kahawai, ʻoi aku ka nui o kāna kahe. Ma ke kumu, penei ka hoʻonui ʻia ʻana o nā kahe - ma ka hoʻohui ʻana i nā shards. Akā ʻoi aku ka nui o nā shards iā ʻoe, ʻoi aku ka kiʻekiʻe o ke kumukūʻai. He 1,5 keneta no kēlā me kēia hola a he 1.4 keneta hou no kēlā me kēia miliona PUT uku uku.

E hana kākou i kahawai hou me ka inoa mokulele_tickets, ua lawa ka 1 shard nona.

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
I kēia manawa, e hana kākou i kekahi pae me ka inoa special_stream:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

Hoʻonohonoho mea hana

No ka nānā ʻana i kahi hana, ua lawa ka hoʻohana ʻana i kahi hiʻohiʻona EC2 maʻamau ma ke ʻano he mea hana ʻikepili. ʻAʻole pono e lilo ia i mīkini makamae ikaika a pipiʻi; e hana maikaʻi ana kahi wahi t2.micro.

Mea nui: no ka laʻana, pono ʻoe e hoʻohana i ke kiʻi - Amazon Linux AMI 2018.03.0, ʻoi aku ka liʻiliʻi o nā hoʻonohonoho no ka hoʻomaka koke ʻana i ka Kinesis Agent.

E hele i ka lawelawe EC2, e hana i ka mīkini virtual hou, e koho i ka AMI i makemake ʻia me ke ʻano t2.micro, i hoʻokomo ʻia i ka Free Tier:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
I mea e hiki ai i ka mīkini makamae hou ke hana me ka lawelawe Kinesis, pono e hāʻawi ʻia nā kuleana e hana pēlā. ʻO ke ala maikaʻi loa e hana ai i kēia, ʻo ka hāʻawi ʻana i kahi kuleana IAM. No laila, ma ka ʻanuʻu 3: Configure Instance Details screen, pono ʻoe e koho E hana i ka hana IAM hou:

Ke hana nei i kahi hana IAM no EC2
Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Ma ka puka aniani e wehe ana, koho mākou e hana ana mākou i kahi hana hou no EC2 a hele i ka ʻae ʻae:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Me ka hoʻohana ʻana i ka laʻana hoʻomaʻamaʻa, ʻaʻole pono mākou e komo i nā mea paʻakikī o ka hoʻonohonoho kikoʻī o nā kuleana waiwai, no laila e koho mākou i nā kulekele i hoʻonohonoho mua ʻia e Amazon: AmazonKinesisFullAccess a me CloudWatchFullAccess.

E hāʻawi i kekahi inoa koʻikoʻi no kēia kuleana, no ka laʻana: EC2-KinesisStreams-FullAccess. Pono ka hopena e like me ka mea i hōʻike ʻia ma ke kiʻi ma lalo nei:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Ma hope o ka hoʻokumu ʻana i kēia hana hou, mai poina e hoʻopili iā ia i ke ʻano mīkini virtual i hana ʻia:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
ʻAʻole mākou e hoʻololi i kekahi mea ʻē aʻe ma kēia pale a neʻe i nā puka makani aʻe.

Hiki ke waiho ʻia nā hoʻonohonoho paʻa paʻa ma ke ʻano he paʻamau, a me nā hōʻailona (ʻoiai he hana maikaʻi ka hoʻohana ʻana i nā hōʻailona, ​​​​ma ka liʻiliʻi e hāʻawi i kahi inoa a hōʻike i ke kaiapuni).

Aia mākou ma ka ʻanuʻu 6: Configure Security Group tab, kahi e pono ai ʻoe e hana i kahi mea hou a i ʻole e kuhikuhi i kāu pūʻulu palekana i loaʻa, e hiki ai iā ʻoe ke hoʻohui ma o ssh (port 22) i ka laʻana. E koho i ke Puna -> Koʻu IP ma laila a hiki iā ʻoe ke hoʻomaka i ke kumu.

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Ke hoʻololi koke ia i ke kūlana holo, hiki iā ʻoe ke hoʻāʻo e hoʻopili iā ia ma o ssh.

I mea e hiki ai ke hana pū me Kinesis Agent, ma hope o ka hoʻopili ʻana i ka mīkini, pono ʻoe e hoʻokomo i kēia mau kauoha i ka pahu:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

E hana kākou i waihona e mālama i nā pane API:

sudo mkdir /var/log/airline_tickets

Ma mua o ka hoʻomaka ʻana i ka ʻelele, pono ʻoe e hoʻonohonoho i kāna config:

sudo vim /etc/aws-kinesis/agent.json

Penei ke ano o na mea o ka waihona agent.json:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

E like me ka mea i ʻike ʻia mai ka faila hoʻonohonoho, e nānā ka luna i nā faila me ka .log extension i loko o ka /var/log/airline_tickets/ directory, parse a hoʻoili iā lākou i ke kahawai airline_tickets.

Hoʻomaka hou mākou i ka lawelawe a hōʻoia i ka holo ʻana:

sudo service aws-kinesis-agent restart

I kēia manawa e hoʻoiho i ka palapala Python e noi ai i ka ʻikepili mai ka API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Ke noi nei ka palapala api_caller.py i ka ʻikepili mai Aviasales a mālama i ka pane i loaʻa ma ka papa kuhikuhi a ka ʻelele Kinesis e nānā. He kūlana maʻamau ka hoʻokō ʻana o kēia palapala, aia kahi papa TicketsApi, hiki iā ʻoe ke huki asynchronously i ka API. Hāʻawi mākou i kahi poʻomanaʻo me kahi hōʻailona a noi i nā ʻāpana i kēia papa:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

No ka hoʻāʻo ʻana i nā hoʻonohonoho kūpono a me nā hana o ka ʻelele, e hoʻāʻo kākou e holo i ka palapala api_caller.py:

sudo ./api_caller.py TOKEN

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
A ke nānā nei mākou i ka hopena o ka hana ma ka Agent logs a ma ka ʻaoʻao Monitoring ma ka airline_tickets data stream:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
E like me kāu e ʻike ai, hana nā mea āpau a hoʻouna maikaʻi ka Kinesis Agent i ka ʻikepili i ke kahawai. I kēia manawa e hoʻonohonoho i ka mea kūʻai.

Hoʻonohonoho i ka Kinesis Data Analytics

E neʻe kākou i ke kikowaena o ka ʻōnaehana holoʻokoʻa - hana i kahi noi hou ma Kinesis Data Analytics i kapa ʻia ʻo kinesis_analytics_airlines_app:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
ʻAe ʻo Kinesis Data Analytics iā ʻoe e hana i ka ʻikepili ʻikepili manawa maoli mai Kinesis Streams me ka hoʻohana ʻana i ka ʻōlelo SQL. He lawelawe autoscaling piha (ʻaʻole like me Kinesis Streams) e:

  1. hiki iā ʻoe ke hana i nā kahawai hou (Output Stream) ma muli o nā noi i ka ʻikepili kumu;
  2. hāʻawi i kahi kahawai me nā hewa i hana ʻia i ka wā e holo ana nā noi (Error Stream);
  3. hiki ke ho'oholo 'akomi i ka ho'olālā 'ikepili ho'okomo (hiki ke ho'oponopono hou 'ia inā pono).

ʻAʻole kēia he lawelawe haʻahaʻa - 0.11 USD i kēlā me kēia hola o ka hana, no laila pono ʻoe e hoʻohana pono a holoi iā ia ke pau ʻoe.

E hoʻohui i ka palapala noi i ke kumu ʻikepili:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
E koho i ke kahawai a mākou e hoʻopili ai (airline_tickets):

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
A laila, pono ʻoe e hoʻopili i kahi Role IAM hou i hiki i ka palapala noi ke heluhelu mai ke kahawai a kākau i ke kahawai. No ka hana ʻana i kēia, ua lawa ʻaʻole e hoʻololi i kekahi mea ma ka poloka ʻae Access:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
I kēia manawa e noi mākou i ka ʻike ʻana i ka schema data ma ke kahawai; e hana i kēia, kaomi i ke pihi "Discover schema". ʻO ka hopena, e hōʻano hou ʻia ka hana IAM (e hana ʻia kahi mea hou) a e hoʻomaka ʻia ka ʻike ʻana i ka schema mai ka ʻikepili i hōʻea i ke kahawai:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
I kēia manawa pono ʻoe e hele i ka hoʻoponopono SQL. Ke kaomi ʻoe i kēia pihi, e ʻike ʻia kahi puka aniani e noi ana iā ʻoe e hoʻomaka i ka noi - koho i kāu mea e makemake ai e hoʻomaka:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
E hoʻokomo i kēia nīnau maʻalahi i loko o ka puka makani hoʻoponopono SQL a kaomi iā Save a holo SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Ma nā ʻikepili pili, hana ʻoe me nā papa me ka hoʻohana ʻana i nā ʻōlelo INSERT e hoʻohui i nā moʻolelo a me kahi ʻōlelo SELECT e nīnau i ka ʻikepili. Ma Amazon Kinesis Data Analytics, hana ʻoe me nā kahawai (STREAMs) a me nā pumps (PUMPs)—noi hoʻokomo mau e hoʻokomo i ka ʻikepili mai kekahi kahawai i loko o kahi noi i kahi kahawai ʻē aʻe.

ʻO ka nīnau SQL i hōʻike ʻia ma luna e ʻimi ana i nā tiketi Aeroflot ma ke kumu kūʻai ma lalo o ʻelima kaukani rubles. E waiho ʻia nā moʻolelo a pau i kūpono i kēia mau kūlana ma ke kahawai DESTINATION_SQL_STREAM.

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Ma ka poloka Destination, koho i ke kahawai special_stream, a ma ka inoa kahawai In-application DESTINATION_SQL_STREAM drop-down list:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
ʻO ka hopena o nā manipulations a pau e like me ke kiʻi ma lalo nei:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

Ke hana ʻana a me ke kau inoa ʻana i kahi kumuhana SNS

E hele i ka Simple Notification Service a hana i kahi kumuhana hou ma laila me ka inoa ʻo Airlines:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
E kau inoa i kēia kumuhana a hōʻike i ka helu kelepona hele e hoʻouna ʻia ai nā leka SMS:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

E hana i papa ma DynamoDB

No ka mālama ʻana i ka ʻikepili maka mai ko lākou kahawai airline_tickets, e hana kākou i papa ma DynamoDB me ka inoa like. E hoʻohana mākou i record_id ma ke kī mua:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

Ke hana ʻana i kahi ʻohi hana lambda

E hana kākou i kahi hana lambda i kapa ʻia ʻO Collector, nona ka hana e koho i ke kahawai airline_tickets a inā loaʻa nā moʻolelo hou ma laila, e hoʻokomo i kēia mau moʻolelo i ka papa DynamoDB. ʻIke loa, ma waho aʻe o nā kuleana paʻamau, pono e heluhelu kēia lambda i ke kahawai ʻike Kinesis a kākau i ke komo ʻana iā DynamoDB.

Ke hana ʻana i kahi hana IAM no ka hana lambda ʻohi
ʻO ka mua, e hana kākou i kahi hana IAM hou no ka lambda i kapa ʻia ʻo Lambda-TicketsProcessingRole:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
No ka laʻana hoʻāʻo, kūpono nā kulekele AmazonKinesisReadOnlyAccess a me AmazonDynamoDBFullAccess i hoʻonohonoho mua ʻia, e like me ka hōʻike ʻana ma ke kiʻi ma lalo nei.

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

Pono e hoʻokuʻu ʻia kēia lambda e kahi kumu mai Kinesis i ka wā e komo ai nā mea hou i ka airline_stream, no laila pono mākou e hoʻohui i kahi mea hou:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
ʻO nā mea i koe e hoʻokomo i ke code a mālama i ka lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ke hana ʻana i kahi leka hoʻomaopopo hana lambda

ʻO ka lua o ka lambda hana, e nānā i ka lua o ke kahawai (special_stream) a hoʻouna i kahi leka hoʻomaopopo iā SNS, hana ʻia ma ke ʻano like. No laila, pono e loaʻa i kēia lambda ke heluhelu mai Kinesis a hoʻouna i nā leka i kahi kumuhana SNS i hāʻawi ʻia, a laila e hoʻouna ʻia e ka lawelawe SNS i nā mea kākau inoa a pau o kēia kumuhana (leka uila, SMS, etc.).

Ke hana nei i kahi hana IAM
ʻO ka mua, hana mākou i ka hana IAM Lambda-KinesisAlarm no kēia lambda, a laila hāʻawi i kēia kuleana i ka alarm_notifier lambda e hana ʻia nei:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

Pono kēia lambda e hana i kahi hoʻoheheʻe no nā moʻolelo hou e komo i ka special_stream, no laila pono ʻoe e hoʻonohonoho i ke kumu e like me kā mākou i hana ai no ka Collector lambda.

I mea e maʻalahi ai ka hoʻonohonoho ʻana i kēia lambda, e hoʻolauna i kahi ʻano hoʻololi kaiapuni hou - TOPIC_ARN, kahi a mākou e kau ai i ka ANR (Amazon Recourse Names) o ke kumuhana Airlines:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
A hoʻokomo i ka code lambda, ʻaʻole paʻakikī loa:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Me he mea lā ʻo kēia kahi i hoʻopau ʻia ai ka hoʻonohonoho ʻōnaehana manual. ʻO nā mea i koe e hoʻāʻo a hōʻoia ua hoʻonohonoho pono mākou i nā mea āpau.

Hoʻouna mai ka code Terraform

Hoʻomākaukau pono

ʻOhana hoʻoponopono He mea hana maʻalahi loa ia no ka lawe ʻana i nā ʻōnaehana mai ka code. Loaʻa iā ia kāna syntax ponoʻī e maʻalahi e aʻo a he nui nā hiʻohiʻona o ke ʻano a me ka mea e hoʻolaha ai. ʻO ka mea hoʻoponopono Atom a i ʻole Visual Studio Code he nui nā plugins maʻalahi e maʻalahi ka hana me Terraform.

Hiki iā ʻoe ke hoʻoiho i ka puʻunaue mai kēia wahi. ʻO ka ʻike kikoʻī o nā mana Terraform āpau ma waho o ke kiko o kēia ʻatikala, no laila e kaupalena mākou iā mākou iho i nā kumu nui.

Pehea e hoʻomaka ai

ʻO ke code piha o ka papahana i loko o koʻu waihona. Hoʻopili mākou i ka waihona iā mākou iho. Ma mua o ka hoʻomaka ʻana, pono ʻoe e hōʻoia ua hoʻokomo a hoʻonohonoho ʻia ʻo AWS CLI, no ka mea ... E ʻimi ʻo Terraform i nā hōʻoia ma ka faila ~/.aws/credentials.

ʻO kahi hoʻomaʻamaʻa maikaʻi ʻo ka holo ʻana i ke kauoha hoʻolālā ma mua o ka hoʻohana ʻana i ka ʻōnaehana holoʻokoʻa e ʻike i ka mea a Terraform e hana nei no mākou i ke ao:

terraform.exe plan

E koi ʻia ʻoe e hoʻokomo i kahi helu kelepona e hoʻouna ai i nā leka hoʻomaopopo. ʻAʻole pono ke komo ʻana i kēia pae.

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Ma hope o ka nānā ʻana i ka hoʻolālā hana o ka papahana, hiki iā mākou ke hoʻomaka i ka hana ʻana i nā kumuwaiwai:

terraform.exe apply

Ma hope o ka hoʻouna ʻana i kēia kauoha, e noi hou ʻia ʻoe e hoʻokomo i kahi helu kelepona; kāomi "ʻae" ke hōʻike ʻia kahi nīnau e pili ana i ka hana maoli ʻana i nā hana. E ʻae kēia iā ʻoe e hoʻonohonoho i ka ʻōnaehana holoʻokoʻa, e hoʻokō i nā hoʻonohonoho pono āpau o EC2, e kau i nā hana lambda, etc.

Ma hope o ka hoʻokumu ʻia ʻana o nā kumuwaiwai āpau ma o ka code Terraform, pono ʻoe e hele i nā kikoʻī o ka noi Kinesis Analytics (akā, ʻaʻole wau i ʻike pehea e hana pololei ai mai ke code).

E hoʻomaka i ka polokalamu:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Ma hope o kēia, pono ʻoe e hoʻonohonoho pono i ka inoa kahawai i loko o ka noi ma ke koho ʻana mai ka papa inoa hāʻule iho:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
I kēia manawa ua mākaukau nā mea a pau e hele.

Ke ho'āʻo nei i ka noi

ʻO ke ʻano o kāu hoʻoili ʻana i ka ʻōnaehana, me ka lima a ma o ka code Terraform, e hana like ia.

Hoʻokomo mākou ma o SSH i ka mīkini virtual EC2 kahi i hoʻokomo ʻia ai ʻo Kinesis Agent a holo i ka palapala api_caller.py

sudo ./api_caller.py TOKEN

ʻO nā mea a pau āu e hana ai, e kali i kahi SMS i kāu helu:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
SMS - hiki mai ka memo ma ke kelepona ma kahi o 1 minuke:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless
Ke waiho nei ia e ʻike inā ua mālama ʻia nā moʻolelo i ka waihona ʻo DynamoDB no ka nānā ʻana aʻe. Aia i loko o ka papa kuhikuhi airline_tickets kēia mau ʻikepili:

Hoʻohui ʻia ʻo Aviasales API me Amazon Kinesis a me ka maʻalahi o ka serverless

hopena

I ka wā o ka hana i hana ʻia, ua kūkulu ʻia kahi ʻōnaehana ʻikepili pūnaewele ma muli o Amazon Kinesis. Ua noʻonoʻo ʻia nā koho no ka hoʻohana ʻana i ka Kinesis Agent i hui pū me nā Kinesis Data Streams a me nā ʻikepili maoli ʻo Kinesis Analytics me ka hoʻohana ʻana i nā kauoha SQL, a me ka launa pū ʻana o Amazon Kinesis me nā lawelawe AWS ʻē aʻe.

Ua kau mākou i ka ʻōnaehana ma luna ma nā ʻano ʻelua: kahi manual lōʻihi a me kahi wikiwiki mai ka code Terraform.

Loaʻa nā code kumu papahana a pau ma kaʻu waihona GitHub, Manaʻo wau e hoʻomaʻamaʻa ʻoe iā ʻoe iho me ia.

Hauʻoli wau e kūkākūkā i ka ʻatikala, ke kakali nei au i kāu ʻōlelo. Manaʻo wau no ka hoʻohewa ʻana.

Makemake wau e holomua ʻoe!

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka