Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

Hei Habr!

He pai ki a koe nga rererangi rererangi? He pai ki a au, engari i te wa e noho mokemoke ana ahau i aroha ki te tarai i nga raraunga mo nga tikiti rererangi mai i tetahi rauemi rongonui - Aviasales.

I tenei ra ka wetewetehia e matou nga mahi a Amazon Kinesis, te hanga i te punaha rerema me nga tātaritanga-a-waa, te whakauru i te paataka raraunga Amazon DynamoDB NoSQL hei rokiroki raraunga matua, me te whakarite whakamohiotanga SMS mo nga tikiti whakamere.

Ko nga korero katoa kei raro i te tapahi! Haere!

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

Whakataki

Mo te tauira, me uru tatou ki Aviales API. Ko te urunga ki a ia ka whakawhiwhia ki te kore utu me te kore here; me rehita noa koe ki te waahanga "Kaiwhakawhanake" ki te tango i to tohu tohu API ki te uru ki nga raraunga.

Ko te kaupapa matua o tenei tuhinga he whakamarama whanui mo te whakamahinga o nga korero whakawhiti korero i roto i te AWS; ka mahara matou ko nga raraunga i whakahokia mai e te API i whakamahia kaore i te tino hou, ka tukuna mai i te keteroki, ara i hangaia i runga i nga rapunga a nga kaiwhakamahi o nga pae Aviasales.ru me Jetradar.com mo nga haora 48 whakamutunga.

Ko te Kinesis-agent, i whakauruhia ki runga i te miihini whakaputa, ka riro mai ma te API ka tohatoha aunoa me te tuku raraunga ki te awa e hiahiatia ana ma te Kinesis Data Analytics. Ko te putanga mata o tenei awa ka tuhia tika ki te toa. Ko te rokiroki raraunga mata kua tukuna ki DynamoDB ka taea te tātari tiiti hohonu ma nga taputapu BI, penei i te AWS Quick Sight.

Ka whakaarohia e maatau nga waahanga e rua mo te tuku i nga hanganga katoa:

  • ā-ringa - mā te Papatohu Whakahaere AWS;
  • Ko nga hanganga mai i te waehere Terraform mo nga miihini mangere;

Te hoahoanga o te punaha kua whakawhanakehia

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Wae i whakamahia:

  • Aviasales API — ko nga raraunga i whakahokia mai e tenei API ka whakamahia mo nga mahi katoa ka whai ake;
  • EC2 Kaihanga Tauira — he miihini mariko i roto i te kapua ka puta te awa raraunga whakauru:
    • Kaihokohoko Kinesis he tono Java kua whakauruhia ki runga i te miihini e whakarato ana i te huarahi ngawari ki te kohikohi me te tuku raraunga ki Kinesis (Kinesis Data Streams or Kinesis Firehose). Ka aroturuki tonu te kaihoko i te huinga o nga konae kei roto i nga raarangi kua tohua me te tuku raraunga hou ki Kinesis;
    • Hōtuhi Kaiwaea API — He tuhinga Python e tono tono ana ki te API me te tuku i te whakautu ki roto i te kōpaki ka tirohia e te Kinesis Agent;
  • Nga Raraunga Kinesis — ratonga roma raraunga tuuturu me te kaha whakahiato whanui;
  • Kinesis tātaritanga he ratonga karekau he whakamaarama i te tātaritanga o te rerenga raraunga i te waa tuuturu. Ka whirihorahia e te Amazon Kinesis Data Analytics nga rauemi tono me te tauine aunoa ki te hapai i nga rahinga o nga raraunga taumai;
  • Tuhinga o mua — he ratonga e taea ai e koe te whakahaere waehere me te kore e tautoko, e whakarite ana ranei i nga kaitoro. Ko nga mana rorohiko katoa ka whakatauhia aunoa mo ia waea;
  • Amazon DynamoDB - He pātengi raraunga o nga takirua uara-matua me nga tuhinga e whakarato ana i te roanga iti iho i te 10 manomanohakona ina rere ana i tetahi tauine. I a koe e whakamahi ana i te DynamoDB, kaore koe e hiahia ki te whakarato, ki te papaki, ki te whakahaere ranei i nga kaitoro. Ka whakatauira aunoa a DynamoDB i nga ripanga hei whakatika i te nui o nga rauemi e waatea ana me te pupuri i nga mahi teitei. Kaore e hiahiatia he whakahaere punaha;
  • Amazon SNS - he ratonga tino whakahaere mo te tuku karere ma te whakamahi i te tauira kaiohauru-whakaputa (Pub/Sub), e taea ai e koe te wehe i nga ratonga miihini, nga punaha toha me nga tono kore utu. Ka taea te whakamahi SNS ki te tuku korero ki nga kaiwhakamahi mutunga ma nga panui pana waea pūkoro, karere SMS me nga imeera.

Whakangungu tuatahi

Hei whai i te rerenga raraunga, i whakatau ahau ki te whakamahi i nga korero tikiti rererangi i whakahokia mai e te Aviasales API. IN tuhinga he rarangi whanui mo nga tikanga rereke, me tango tetahi o ratou - "Maramataka Utu Marama", e whakahoki mai ana i nga utu mo ia ra o te marama, ka whakarōpūhia e te maha o nga whakawhitinga. Ki te kore koe e tohu i te marama rapu i roto i te tono, ka whakahokia mai nga korero mo te marama i muri mai o te marama o naianei.

Na, me rehita me te tiki i ta tatou tohu.

He tauira tono kei raro:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ko te tikanga i runga ake nei mo te tango raraunga mai i te API ma te tohu tohu i roto i te tono ka mahi, engari ka pai ake ahau ki te tuku i te tohu uru ki roto i te pane, na reira ka whakamahia e matou tenei tikanga i roto i te tuhinga api_caller.py.

Tauira whakautu:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Ko te tauira whakautu API i runga ake nei e whakaatu ana i tetahi tikiti mai i St. Petersburg ki Phuk... Aue, he moemoea...
I te mea no Kazan ahau, a ko Phuket he "moemoea" inaianei, me rapu nga tikiti mai i St. Petersburg ki Kazan.

Kei te whakaaro kei a koe he putea AWS. E hiahia ana ahau ki te aro nui ki te meka ko Kinesis me te tuku whakamohiotanga ma te SMS kaore i whakauruhia ki te tau Taumata Kore (whakamahi kore utu). Engari ahakoa tenei, me nga taara e rua i roto i te hinengaro, ka taea te hanga i te punaha e whakaarohia ana me te takaro ki a ia. A, o te akoranga, kaua e wareware ki te muku i nga rauemi katoa i muri i te kore e hiahiatia.

Waimarie, ka waatea nga mahi DynamoDb me te lambda ki a maatau ki te tutuki i a maatau rohe kore utu ia marama. Hei tauira, mo DynamoDB: 25 GB o te rokiroki, 25 WCU / RCU me te 100 miriona patai. A he miriona waea mahi lambda ia marama.

Te whakatakoto a-ringa i te punaha

Te whakatu i nga awa Raraunga Kinesis

Me haere tatou ki te ratonga Kinesis Data Streams ka hanga e rua nga awa hou, kotahi marau mo ia.

He aha te kongakonga?
Ko te kongakonga te waahanga whakawhiti raraunga taketake o te awa Amazon Kinesis. Ko tetahi waahanga e whakarato ana i te whakawhiti raraunga whakauru i te tere o te 1 MB/s me te whakawhiti raraunga whakaputa i te tere o te 2 MB/s. Ka tautoko tetahi waahanga ki te 1000 nga whakaurunga PUT mo ia hekona. I te wa e hanga ana he awa raraunga, me tohu e koe te maha o nga waahanga e hiahiatia ana. Hei tauira, ka taea e koe te hanga i tetahi awa raraunga me nga waahanga e rua. Ka tukuna e tenei awa raraunga te whakawhiti raraunga whakauru i te 2 MB/s me te whakawhiti raraunga whakaputa i te 4 MB/s, ka tautoko ake ki te 2000 PUT rekoata ia hekona.

Ko te nui ake o nga kongakonga i roto i to awa, ka nui ake te whakaputanga. Ko te tikanga, he penei te whakamaaramatanga o nga rerenga - ma te taapiri i nga kongakonga. Engari ka nui ake nga maramara kei a koe, ka nui ake te utu. He 1,5 heneti te utu mo ia maramara mo ia haora me te taapiri 1.4 heneti mo ia miriona mo nga waahanga utu PUT.

Me hanga he awa hou me te ingoa rererangi_tiketi, 1 te maramara ka ranea mona:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Inaianei me hanga tetahi atu miro me te ingoa special_stream:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

Tatūnga Kaihanga

Hei tātari i tetahi mahi, he nui noa te whakamahi i te tauira EC2 auau hei kaihanga raraunga. Ehara i te mea he miihini mariko kaha, utu nui; ka pai te waahi t2.micro.

Tuhipoka nui: hei tauira, me whakamahi koe i te ahua - Amazon Linux AMI 2018.03.0, he iti ake nga tautuhinga mo te whakarewa tere i te Kinesis Agent.

Haere ki te ratonga EC2, hangaia he miihini mariko hou, tohua te AMI e hiahiatia ana me te momo t2.micro, kei roto i te Tier Free:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Kia taea ai e te miihini mariko hou te hanga ki te mahi tahi me te ratonga Kinesis, me whai mana ki te mahi pera. Ko te huarahi pai ki te mahi i tenei ko te tautapa i tetahi Mahi IAM. Na reira, i runga i te Hipanga 3: Whirihorahia nga Taipitopito Matapihi, me tohu koe Waihangahia te Turanga IAM hou:

Te hanga mahi IAM mo EC2
Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
I te matapihi e tuwhera ana, tohua kei te hangaia e matou he mahi hou mo EC2 ka haere ki te waahanga Whakaaetanga:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ma te whakamahi i te tauira whakangungu, kaore matou e uru ki nga ahuatanga katoa o te whirihoranga mokamoka mo nga mana rauemi, na reira ka kowhiria e matou nga kaupapa here i whirihorahia e Amazon: AmazonKinesisFullAccess me CloudWatchFullAccess.

Me hoatu he ingoa whai kiko mo tenei mahi, hei tauira: EC2-KinesisStreams-FullAccess. Me rite te hua ki te pikitia i raro nei:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Whai muri i te hanga i tenei mahi hou, kaua e wareware ki te whakapiri atu ki te tauira miihini mariko i hangaia:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Kaore matou e whakarereke i tetahi atu mea i runga i tenei mata ka neke atu ki nga matapihi e whai ake nei.

Ka taea te waiho i nga tautuhinga puku pakeke hei taunoa, me nga tohu (ahakoa he pai te mahi ki te whakamahi i nga tohu, me hoatu he ingoa mo te tauira me te tohu i te taiao).

Inaianei kei runga tatou i te Hipanga 6: Whirihorahia te Rōpū Haumarutanga ripa, kei reira koe e hiahia ana ki te hanga i tetahi mea hou, ki te tautuhi ranei i to roopu Haumarutanga o naianei, e taea ai e koe te hono ma te ssh (tauranga 22) ki te tauira. Tīpakohia te Puna -> Ko taku IP kei reira ka taea e koe te whakarewa i te tauira.

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ka huri ki te mana whakahaere, ka taea e koe te ngana ki te hono atu ma te ssh.

Kia taea ai e koe te mahi me te Kinesis Agent, i muri i te hono pai ki te miihini, me whakauru koe i nga whakahau e whai ake nei ki te tauranga:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Hangaia he kōpaki hei tiaki i nga whakautu API:

sudo mkdir /var/log/airline_tickets

I mua i te tiimata i te kaihoko, me whirihora e koe tana whirihora:

sudo vim /etc/aws-kinesis/agent.json

Ko nga ihirangi o te kōnae agent.json me penei te ahua:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Ka kitea mai i te konae whirihoranga, ka aro turuki te kaihoko i nga konae me te toronga .log i roto i te raarangi /var/log/airline_tickets/, ka poroporoaki ka whakawhiti ki te awa rererangi_tickets.

Ka timata ano tatou i te ratonga me te whakarite kei te mahi tonu:

sudo service aws-kinesis-agent restart

Inaianei me tango te tuhinga Python ka tono raraunga mai i te API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Ka tono te tuhinga api_caller.py i nga raraunga mai i a Aviasales ka tiakina te whakautu kua tae mai ki te raarangi ka tirotirohia e te kaihoko Kinesis. Ko te whakatinanatanga o tenei tuhinga he tino paerewa, kei reira he akomanga TicketsApi, ka taea e koe te kumea te API. Ka tukuna he pane me te tohu ka tono tawhā ki tenei akomanga:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Hei whakamatautau i nga tautuhinga tika me te mahi a te kaihoko, me whakamatau te whakahaere i te tuhinga api_caller.py:

sudo ./api_caller.py TOKEN

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
A ka titiro matou ki nga hua o nga mahi i roto i nga raarangi Agent me te ripa Aroturuki i te awa raraunga rererangi_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ka taea e koe te kite, ka mahi nga mea katoa, ka tukuna pai e te Kinesis Agent nga raraunga ki te awa. Inaianei me whirihora nga kaihoko.

Te whakarite i te Kinesis Data Analytics

Me neke atu ki te waahanga matua o te punaha katoa - hanga he tono hou ki te Kinesis Data Analytics ko kinesis_analytics_airlines_app:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ko te Kinesis Data Analytics ka taea e koe te mahi i nga waahanga raraunga tuuturu mai i nga Kinesis Streams ma te whakamahi i te reo SQL. He ratonga autoscaling katoa (kaore i rite ki nga Kinesis Streams) e:

  1. ka taea e koe te hanga i nga awa hou (Waea Huaputa) i runga i nga tono ki te puna raraunga;
  2. ka whakarato i te awa me nga hapa i puta i te wa e rere ana nga tono (Awa Hapa);
  3. ka taea te whakatau aunoa i te kaupapa raraunga whakauru (ka taea te tautuhi-a-ringa ina tika).

Ehara tenei i te ratonga iti - 0.11 USD ia haora o te mahi, no reira me ata whakamahi koe ka muku ina oti ana koe.

Me hono te tono ki te puna raraunga:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Tīpakohia te roma ka hono atu mātou (airline_tickets):

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
I muri mai, me whakauru koe i tetahi Mahi IAM hou kia taea e te tono te panui mai i te awa me te tuhi ki te awa. Hei mahi i tenei, he nui kia kaua e whakarereke i tetahi mea i roto i te paraka whakaaetanga Uru:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Inaianei me tono kia kitea te aronuinga raraunga i roto i te awa; ki te mahi i tenei, paatohia te paatene "Tirohia te aronuinga". Ko te mutunga, ka whakahōuhia te tūranga IAM (ka hangaia he mea hou) ka whakarewahia te rapu aronuinga mai i nga raraunga kua tae mai ki te awa:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Inaianei me haere koe ki te ētita SQL. Ka paato koe i tenei paatene, ka puta he matapihi e tono ana kia whakarewahia te tono - tohua he aha e hiahia ana koe ki te whakarewa:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Kōkuhuhia te pātai māmā e whai ake nei ki te matapihi ētita SQL ka pāwhiritia te Tiaki me te Whakahaere SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

I roto i nga papaunga paaunga, ka mahi tahi koe me nga ripanga ma te whakamahi i nga korero INSERT hei taapiri i nga rekoata me tetahi korero SELECT ki te uiui raraunga. I roto i te Amazon Kinesis Data Analytics, ka mahi koe me nga awa (STREAMs) me nga papua (PUMPs)—nga tono whakauru tonu e whakauru ana i nga raraunga mai i tetahi awa i roto i te tono ki tetahi atu awa.

Ko te patai SQL kua whakaatuhia i runga ake nei mo nga tikiti Aeroflot i raro i te rima mano rubles. Ko nga rekoata katoa e tutuki ana ki enei tikanga ka tuu ki te awa DESTINATION_SQL_STREAM.

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
I roto i te paraka Ūnga, tīpakohia te awa_motuhake, kei roto i te ingoa awa-roto DESTINATION_SQL_STREAM rārangi taka-iho:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ko te hua o nga raweke katoa me rite ki te pikitia i raro nei:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

Te hanga me te ohauru ki tetahi kaupapa SNS

Haere ki te Ratonga Whakamōhiotanga Maamaa ka hanga kaupapa hou ki reira me te ingoa Airlines:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ohauru ki tenei kaupapa me te tohu i te nama waea pūkoro ka tukuna nga whakamohiotanga SMS:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

Waihangahia he ripanga ki DynamoDB

Hei rokiroki i nga raraunga mata mai i a raatau rererangi_tickets awa, me hanga he ripanga ki DynamoDB me te ingoa kotahi. Ka whakamahia e matou te record_id hei matua matua:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

Te hanga kaikohi mahi lambda

Me hanga he mahi lambda e kiia nei ko Collector, ko tana mahi ko te pooti i te awa rererangi_tickets a, ki te kitea he rekoata hou ki reira, whakauruhia enei rekoata ki te ripanga DynamoDB. Ma te mohio, i tua atu i nga mana taunoa, me whai panui tenei lambda ki te awa raraunga Kinesis me te tuhi uru ki DynamoDB.

Te hanga mahi IAM mo te mahi kaikohi lambda
Tuatahi, me hanga he mahi IAM hou mo te lambda ko Lambda-TicketsProcessingRole:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Mo te tauira whakamatautau, he tino pai nga kaupapa here AmazonKinesisReadOnlyAccess me AmazonDynamoDBFullAccess kua whirihora-mua, penei i te pikitia i raro nei:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

Me whakarewahia tenei lambda e te keu mai i te Kinesis ka uru mai nga urunga hou ki te airline_stream, no reira me taapiri he keu hou:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ko nga mea e toe ana ko te whakauru i te waehere me te tiaki i te lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Te hanga i te whakamohiotanga mahi lambda

Ko te mahi tuarua o te lambda, ka aro turuki i te awa tuarua (special_stream) me te tuku panui ki a SNS, he rite tonu te hanga. Na reira, me whai waahi tenei lambda ki te panui mai i te Kinesis me te tuku karere ki tetahi kaupapa SNS kua hoatu, ka tukuna atu e te ratonga SNS ki nga kaiohauru katoa o tenei kaupapa (imera, SMS, me etahi atu).

Te hanga mahi IAM
Tuatahi, ka waihangahia e matou te mahi IAM Lambda-KinesisAlarm mo tenei lambda, ka tautapa tenei mahi ki te alarm_notifier lambda e hanga ana:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

Me mahi tenei lambda i runga i te keu mo nga rekoata hou kia uru ki roto i te special_stream, no reira me whirihora e koe te keu kia rite ki ta matou i mahi mo te Collector lambda.

Kia ngawari ake te whirihora i tenei lambda, me whakauru mai he taurangi taiao hou - TOPIC_ARN, ki reira tuu ai te ANR (Amazon Recourse Names) o te kaupapa rererangi:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
A ka whakauru i te waehere lambda, ehara i te mea uaua:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Te ahua nei kei konei te whakaotinga o te whirihoranga punaha a-ringa. Ko te mea e toe ana ko te whakamatautau me te whakarite kua whirihora tika nga mea katoa.

Tukuna mai i te waehere Terraform

Te whakarite

Terraform he taputapu tuwhera-puna tino watea mo te tuku hanganga mai i te waehere. Kei a ia ake te wetereo he ngawari ki te ako me te maha o nga tauira mo te pehea me te aha hei tohatoha. Ko te ētita Atom, Visual Studio Waehere ranei he maha nga taputapu taapiri e pai ake ai te mahi me te Terraform.

Ka taea e koe te tango i te tohatoha mai i konei. Ko te tātaritanga taipitopito o nga kaha katoa o Terraform kei tua atu i te whānuitanga o tenei tuhinga, no reira ka whakawhäitihia e mätou ki ngä kaupapa matua.

Me pehea te whakahaere

Ko te waehere katoa o te kaupapa i roto i taku putunga. Ka kati tatou i te putunga ki a tatou ano. I mua i te tiimata, me mohio koe kua whakauruhia e koe te AWS CLI me te whirihora, na te mea ... Ka kimihia e Terraform nga tohu kei te konae ~/.aws/credentials.

Ko te mahi pai ko te whakahaere i te whakahau mahere i mua i te tuku i nga hanganga katoa kia kite he aha te mahi a Terraform mo tatou i te kapua:

terraform.exe plan

Ka akiakihia koe ki te whakauru i tetahi nama waea hei tuku whakamohiotanga. Kaore e tika kia uru mai i tenei waahanga.

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
I te wetewete i te mahere mahi o te hotaka, ka taea e tatou te timata ki te hanga rauemi:

terraform.exe apply

I muri i te tuku i tenei whakahau, ka tonohia ano koe ki te whakauru i tetahi nama waea; waeahia "ae" ka whakaatuhia he patai mo te tino mahi i nga mahi. Ma tenei ka taea e koe te whakarite i nga hanganga katoa, te kawe i nga whirihoranga e tika ana mo te EC2, te tohatoha i nga mahi lambda, me era atu.

I muri i te hanganga angitu o nga rauemi katoa na roto i te waehere Terraform, me uru koe ki nga korero mo te tono Kinesis Analytics (kaore au i kitea me pehea te mahi tika mai i te waehere).

Whakarewahia te tono:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
I muri i tenei, me tino tautuhi koe i te ingoa awa-a-tono ma te kowhiri mai i te rarangi taka-iho:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Inaianei kua rite nga mea katoa ki te haere.

Te whakamatautau i te tono

Ahakoa te pehea i tukuna ai e koe te punaha, ma te ringaringa, ma te waehere Terraform ranei, ka rite tonu te mahi.

Ka takiuru matou ma te SSH ki te miihini mariko EC2 kei reira ka whakauruhia a Kinesis Agent ka whakahaere i te tuhinga api_caller.py

sudo ./api_caller.py TOKEN

Ko taau anake me tatari mo te SMS ki to nama:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
SMS - ka tae mai te karere ki runga waea i te tata ki te 1 meneti:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau
Kei te noho tonu ki te kite mena i tiakina nga rekoata ki te paataka raraunga DynamoDB mo nga waahanga o muri mai, he tino taipitopito. Kei roto i te ripanga rererangi_tickets nga raraunga e whai ake nei:

Ko te whakaurunga API Aviasales me te Amazon Kinesis me te ngawari o te kaimau

mutunga

Hei waahanga o nga mahi i mahia, i hangaia he punaha tukatuka raraunga ipurangi i runga i te Amazon Kinesis. Ko nga whiringa mo te whakamahi i te Kinesis Agent i te taha o nga Raraunga Raraunga Kinesis me nga tātaritanga-a-waa Kinesis Analytics ma te whakamahi i nga whakahau SQL, me te taunekeneke a Amazon Kinesis me etahi atu ratonga AWS i whakaarohia.

I tukuna e matou te punaha i runga ake nei i nga huarahi e rua: he pukapuka roa me te tere mai i te waehere Terraform.

Kei te waatea te waehere puna kaupapa katoa i roto i taku putunga GitHub, Ko taku whakaaro kia mohio koe ki a koe.

Kei te harikoa ahau ki te matapaki i te tuhinga, kei te tumanako ahau ki o korero. Te ti'aturi nei au mo nga whakaheinga whai hua.

E hiahia ana ahau kia angitu koe!

Source: will.com

Tāpiri i te kōrero