Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

Ndewo, Habr!

Ụgbọ elu na-efe efe ọ na-amasị gị? Ahụrụ m ya n'anya, mana n'oge ịnọpụrụ onwe m, enwekwara m mmasị na nyocha data na tiketi ụgbọ elu sitere na otu akụrụngwa ama ama - Aviasales.

Taa, anyị ga-enyocha ọrụ Amazon Kinesis, wuo usoro mgbasa ozi na nyocha oge, wụnye Amazon DynamoDB NoSQL nchekwa data dị ka isi nchekwa data, ma guzobe ọkwa SMS maka tiketi na-adọrọ mmasị.

Nkọwa niile dị n'okpuru ịkpụ! Gaba!

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

Okwu Mmalite

Dịka ọmụmaatụ, anyị kwesịrị ịnweta Aviasales API. A na-enye ịnweta ya n'efu na enweghị mgbochi; naanị ị ga-edebanye aha na ngalaba "Ndị Mmepe" iji nweta akara API gị iji nweta data ahụ.

Ebumnuche bụ isi nke edemede a bụ inye nghọta zuru oke banyere ojiji nke mgbasa ozi na AWS; anyị na-eburu n'uche na data API weghachiri eweghachi abụghị nke kachasị ọhụrụ ma na-ebufe ya na cache, nke bụ. hibere dabere na ọchụchọ nke ndị ọrụ nke saịtị Aviasales.ru na Jetradar.com maka awa 48 gara aga.

Kinesis-agent, arụnyere na igwe na-emepụta ihe, natara site na API ga-akpachapụ anya na-ebufe data na iyi achọrọ site na Kinesis Data Analytics. A ga-ede ụdị raw nke iyi a ozugbo na ụlọ ahịa ahụ. Nchekwa data raw nke etinyere na DynamoDB ga-enye ohere maka nyocha tiketi miri emi site na ngwaọrụ BI, dị ka AWS Quick Sight.

Anyị ga-atụle nhọrọ abụọ maka ibunye akụrụngwa niile:

  • Akwụkwọ ntuziaka - site na njikwa njikwa AWS;
  • Akụrụngwa sitere na koodu Terraform bụ maka ndị ọrụ ume ume;

Architecture nke usoro mepere emepe

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ngwa ndị eji:

  • Aviasales API - A ga-eji data API a weghachiri eweghachi maka ọrụ niile na-esote;
  • Ihe ngosi EC2 Producer - igwe mebere oge niile n'igwe ojii nke a ga-ewepụta iyi data ntinye na ya:
    • Onye nnọchi anya Kinesis bụ ngwa Java arụnyere na mpaghara na igwe na-enye ụzọ dị mfe ịnakọta na zipu data na Kinesis (Kinesis Data Streams or Kinesis Firehose). Onye ọrụ ahụ na-enyocha otu faịlụ na akwụkwọ ndekọ aha mgbe niile wee ziga data ọhụrụ na Kinesis;
    • Edemede onye na-akpọ API - Ederede Python nke na-arịọ arịrịọ na API ma tinye nzaghachi n'ime nchekwa nke Kinesis Agent na-enyocha;
  • Kinesis Data iyi - ọrụ gụgharia data n'ezie nwere ike ịbelata oke;
  • Nchịkọta Kinesis bụ ọrụ na-enweghị ihe nkesa na-eme ka nyocha nke data mgbasa ozi dị mfe n'oge. Amazon Kinesis Data Analytics na-ahazi akụrụngwa ngwa na na-akpaghị aka na-eme ka ijikwa olu ọ bụla nke data na-abata;
  • Lambda AWS - ọrụ na-enye gị ohere ịme koodu na-enweghị nkwado ma ọ bụ ịtọlite ​​​​sava. A na-atụba ike kọmpụta niile na-akpaghị aka maka oku ọ bụla;
  • Amazon DynamoDB - Ebe nchekwa data nke isi ụzọ abụọ bara uru na akwụkwọ na-enye latency ihe na-erughị 10 milliseconds mgbe ọ na-agba ọsọ n'ogo ọ bụla. Mgbe ị na-eji DynamoDB, ịkwesighi ịnye, kpachie ma ọ bụ jikwaa sava ọ bụla. DynamoDB na-atụnye tebụl na-akpaghị aka iji mezie ọnụọgụ akụrụngwa dị ma jikwaa arụmọrụ dị elu. Enweghị nchịkwa usoro achọrọ;
  • Amazon SNS - ọrụ a na-achịkwa nke ọma maka izipu ozi site na iji ụdị onye nbipụta-ndebanye aha (Pub/Sub), nke ị nwere ike kewapụ microservices, sistemụ kesara na ngwa enweghị ihe nkesa. Enwere ike iji SNS zipu ozi ka ndị ọrụ kwụsịchaa site na ọkwa mkpagharị mkpanaka, ozi SMS na ozi-e.

Ọzụzụ izizi

Iji ṅomie usoro data ahụ, ekpebiri m iji ozi tiketi ụgbọ elu nke Aviasales API weghachiri. N'ime akwụkwọ ndepụta zuru oke nke ụzọ dị iche iche, ka anyị were otu n'ime ha - "Kalịnda ọnụ ahịa kwa ọnwa", nke na-eweghachi ọnụ ahịa maka ụbọchị ọ bụla nke ọnwa, nke ọnụ ọgụgụ mbufe jikọtara ọnụ. Ọ bụrụ na ị kọwapụtaghị ọnwa ọchụchọ na arịrịọ ahụ, a ga-eweghachi ozi maka ọnwa na-esote nke dị ugbu a.

Yabụ, ka anyị debanye aha ma nweta akara anyị.

Arịrịọ ihe atụ dị n'okpuru:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Usoro dị n'elu nke ịnweta data sitere na API site n'ịkọpụta akara ngosi na arịrịọ ahụ ga-arụ ọrụ, ma m na-ahọrọ ịfefe ohere ịnweta site na nkụnye eji isi mee, ya mere anyị ga-eji usoro a na api_caller.py script.

Ihe atụ zaa:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Nzaghachi API nke dị n'elu na-egosi tiketi si St. Petersburg gaa Phuk... Oh, kedu nrọ...
Ebe ọ bụ na m si Kazan, na Phuket bụ ugbu a "naanị nrọ", ka anyị chọọ tiketi si St. Petersburg na Kazan.

Ọ na-eche na ị nweelarị akaụntụ AWS. Ọ ga-amasị m ịdọrọ uche pụrụ iche ozugbo na Kinesis na izipu ozi site na SMS adịghị etinye na kwa afọ. Ọkwa efu (iji efu). Ma n'agbanyeghị na nke a, na a di na nwunye nke dollar n'uche, ọ bụ nnọọ omume na-ewu na-atụ aro usoro na-egwu ya. Ma, n'ezie, echefula ihichapụ ihe niile mgbe ọ dịghịzi mkpa.

Ọ dabara nke ọma, ọrụ DynamoDb na lambda ga-enwere anyị n'efu ma ọ bụrụ na anyị ezute oke n'efu kwa ọnwa. Dịka ọmụmaatụ, maka DynamoDB: nchekwa 25 GB, 25 WCU/RCU na ajụjụ 100 nde. Na otu nde lambda ọrụ na-akpọ kwa ọnwa.

Nbunye sistemụ ntuziaka

Ịmepụta Kinesis Data Streams

Ka anyị gaa na ọrụ Kinesis Data Streams wee mepụta iyi ọhụrụ abụọ, otu shard maka nke ọ bụla.

Kedu ihe bụ shard?
Shard bụ ngalaba mbufe data nke Amazon Kinesis iyi. Otu akụkụ na-enye ịnyefe data ntinye na ọsọ nke 1 MB/s na mbufe data na ọsọ nke 2 MB/s. Otu akụkụ na-akwado ihe ruru 1000 PUT ntinye kwa nkeji. Mgbe ị na-eke iyi data, ịkwesịrị ịkọwa nọmba achọrọ nke ngalaba. Dịka ọmụmaatụ, ị nwere ike ịmepụta iyi data nwere akụkụ abụọ. iyi data a ga-enye ntinye data ntinye na 2 MB / s na mbufe data mbufe na 4 MB / s, na-akwado ihe ndekọ 2000 PUT kwa nkeji.

Ka shards na-abawanye na iyi gị, ka ntinye ya na-abawanye. N'ụkpụrụ, nke a bụ otú e si agbasa mmiri na-asọ asọ - site na ịgbakwunye shards. Ma ka ị na-enwekwu shards, ọnụahịa ahụ dị elu. Shard nke ọ bụla na-efu 1,5 cents kwa elekere yana mgbakwunye 1.4 cents maka nkeji ịkwụ ụgwọ PUT nde ọ bụla.

Ka anyị mepụta iyi ọhụrụ nwere aha tiketi ụgbọ elu, 1 shard ga-ezuru ya:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ugbu a, ka anyị mepụta eri ọzọ na aha special_stream:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

Nhazi onye nrụpụta

Iji nyochaa ọrụ, o zuru ezu iji ihe atụ EC2 mgbe niile dị ka onye na-emepụta data. Ọ kwesịghị ịbụ igwe mebere dị ike, dị oke ọnụ; ntụpọ t2.micro ga-eme nke ọma.

Ihe dị mkpa: dịka ọmụmaatụ, ịkwesịrị iji onyonyo - Amazon Linux AMI 2018.03.0, ọ nwere ntọala ole na ole maka ịmalite ngwa ngwa Kinesis Agent.

Gaa na ọrụ EC2, mepụta igwe mebere ọhụrụ, họrọ AMI chọrọ nke nwere ụdị t2.micro, nke etinyere na Tier Free:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ka igwe mebere mebere ọhụrụ wee nwee ike ịmekọrịta ọrụ Kinesis, a ga-enyerịrị ya ikike ime ya. Ụzọ kachasị mma isi mee nke a bụ ikenye ọrụ IAM. Ya mere, na nzọụkwụ 3: Hazie ihuenyo nkọwa nkọwa, ị ga-ahọrọ Mepụta Ọrụ IAM ọhụrụ:

Ịmepụta ọrụ IAM maka EC2
Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Na mpio mepere emepe, họrọ na anyị na-eke ọrụ ọhụrụ maka EC2 wee gaa na ngalaba ikike:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
N'iji ihe atụ ọzụzụ, anyị agaghị abanye n'ime mgbagwoju anya niile nke nhazi ikike akụrụngwa, yabụ anyị ga-ahọrọ amụma nke Amazon haziburu: AmazonKinesisFullAccess na CloudWatchFullAccess.

Ka anyị nye ụfọdụ aha bara uru maka ọrụ a, dịka ọmụmaatụ: EC2-KinesisStreams-FullAccess. Nsonaazụ kwesịrị ịbụ otu dị ka egosiri na foto dị n'okpuru:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Mgbe imepụtachara ọrụ ọhụrụ a, echefula itinye ya na igwe igwe mebere emepụtara:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Anyị anaghị agbanwe ihe ọ bụla ọzọ na ihuenyo a wee gaa na windo ndị ọzọ.

Enwere ike ịhapụ ntọala draịvụ ike dị ka ndabara, yana mkpado (ọ bụ ezie na ọ bụ omume dị mma iji mkpado, ọ dịkarịa ala nye ihe atụ aha ma gosipụta gburugburu).

Ugbu a, anyị nọ na Nzọụkwụ 6: Hazie taabụ otu nchekwa, ebe ịchọrọ ịmepụta nke ọhụrụ ma ọ bụ kọwaa otu nchekwa dị adị, nke na-enye gị ohere ijikọ site na ssh (ọdụ ụgbọ mmiri 22) na ihe atụ. Họrọ Isi Iyi -> IP m ebe ahụ ma ị nwere ike ịmalite ihe atụ.

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ozugbo ọ gbanwere na ọkwa ịgba ọsọ, ị nwere ike ịnwa iji ssh jikọọ na ya.

Iji nwee ike ịrụ ọrụ na Kinesis Agent, mgbe ijikọ nke ọma na igwe, ị ga-abanyerịrị iwu ndị a na njedebe:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ka anyị mepụta folda iji chekwaa nzaghachi API:

sudo mkdir /var/log/airline_tickets

Tupu ịmalite onye ọrụ ahụ, ịkwesịrị ịhazi nhazi ya:

sudo vim /etc/aws-kinesis/agent.json

Ọdịnaya nke faịlụ agent.json kwesịrị ịdị ka nke a:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Dị ka a na-ahụ na faịlụ nhazi, onye ọrụ ahụ ga-enyocha faịlụ na ndọtị .log na / var/log/airline_tickets/ directory, tụba ha ma bufee ha na iyi ụgbọ elu_tiketi.

Anyị na-amalitegharị ọrụ ahụ ma hụ na ọ na-arụ ọrụ:

sudo service aws-kinesis-agent restart

Ugbu a ka anyị budata edemede Python nke ga-arịọ data sitere na API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Edemede api_caller.py na-arịọ data sitere na Aviasales wee chekwaa nzaghachi anatara na ndekọ nke onye ọrụ Kinesis na-enyocha. Mmejuputa nke edemede a bụ ezigbo ọkọlọtọ, enwere klaasị TicketsApi, ọ na-enye gị ohere ịdọrọ API n'otu n'otu. Anyị na-ebufe nkụnye eji isi mee ihe wee rịọ parameter na klaasị a:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Iji nwalee ntọala ziri ezi na arụ ọrụ nke onye nnọchite anya, ka anyị nwalee mee edemede api_caller.py:

sudo ./api_caller.py TOKEN

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Anyị na-elekwa anya nsonaazụ nke ọrụ ahụ na ndekọ Agent yana na taabụ nlekota na iyi data airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Dị ka ị na-ahụ, ihe niile na-arụ ọrụ na Kinesis Agent na-ezigara data nke ọma na iyi. Ugbu a, ka anyị hazie ndị ahịa.

Ịmepụta Kinesis Data Analytics

Ka anyị gaa n'ihu na akụkụ etiti nke sistemụ niile - mepụta ngwa ọhụrụ na Kinesis Data Analytics nke aha ya bụ kinesis_analytics_airlines_app:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Kinesis Data Analytics na-enye gị ohere iji asụsụ SQL mee nyocha data ozugbo site na Kinesis Streams. Ọ bụ ọrụ autoscaling zuru oke (n'adịghị ka Kinesis Streams) na:

  1. na-enye gị ohere ịmepụta iyi ọhụrụ (Mpụta Stream) dabere na arịrịọ maka data isi mmalite;
  2. na-enye iyi na njehie mere mgbe ngwa na-agba ọsọ (Error Stream);
  3. nwere ike ikpebi atụmatụ data ntinye na-akpaghị aka (enwere ike iji aka kọwaa ya ma ọ dị mkpa).

Nke a abụghị ọrụ dị ọnụ ala - 0.11 USD kwa elekere ọrụ, yabụ ị ga-eji ya nke ọma wee hichapụ ya mgbe ịmechara.

Ka anyị jikọọ ngwa na isi iyi data:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Họrọ iyi anyị ga-ejikọ na (airline_tickets):

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Na-esote, ịkwesịrị itinye ọrụ IAM ọhụrụ ka ngwa ahụ nwee ike ịgụ na iyi wee dee na iyi. Iji mee nke a, o zuru ezu ka ịghara ịgbanwe ihe ọ bụla na ngọngọ ikike Access:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ugbu a, ka anyị rịọ ka achọpụta atụmatụ data dị na iyi ahụ; iji mee nke a, pịa bọtịnụ "Chọta schema". N'ihi ya, a ga-emelite ọrụ IAM (a ga-emepụta nke ọhụrụ) na nchọpụta atụmatụ ga-amalite site na data nke rutere na iyi:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ugbu a ịkwesịrị ịga na nchịkọta akụkọ SQL. Mgbe ịpịrị bọtịnụ a, windo ga-apụta na-arịọ gị ka ịmalite ngwa ahụ - họrọ ihe ịchọrọ ịmalite:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Fanye ajụjụ dị mfe na-esonụ n'ime windo nchịkọta akụkọ SQL wee pịa Chekwa na Gbaa SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Na ọdụ data mmekọrịta, ị na-arụ ọrụ na tebụl na-eji nkwupụta INSERT iji tinye ndekọ yana nkwupụta ahọpụta na data ajụjụ. Na Amazon Kinesis Data Analytics, ị na-arụ ọrụ na iyi (STREAMs) na nfuli (PUMPs) - arịrịọ ntinye na-aga n'ihu na-etinye data sitere na otu iyi na ngwa n'ime iyi ọzọ.

Ajụjụ SQL ewepụtara n'elu na-achọ tiketi Aeroflot na ọnụ ahịa n'okpuru puku rubles ise. A ga-edowe ndekọ niile tozuru ọnọdụ ndị a na iyi DESTINATION_SQL_STREAM.

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
N'ime ngọngọ ebe, họrọ iyi pụrụ iche_stream, yana n'ime aha iyi In-application DESTINATION_SQL_STREAM ndepụta ndọda:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Nsonaazụ nke aghụghọ niile kwesịrị ịbụ ihe yiri foto dị n'okpuru:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

Ịmepụta na ịdenye aha na isiokwu SNS

Gaa na Ọrụ Ịma Ọkwa dị Mfe wee mepụta isiokwu ọhụrụ ebe ahụ nke nwere aha ụgbọ elu:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Debanye aha na isiokwu a ma gosi nọmba ekwentị mkpanaaka nke a ga-eziga ọkwa SMS:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

Mepụta tebụl na DynamoDB

Iji chekwaa data raw sitere na iyi nke tiketi ụgbọ elu ha, ka anyị mepụta tebụl na DynamoDB nwere otu aha. Anyị ga-eji record_id dị ka isi igodo:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

Ịmepụta onye na-anakọta ọrụ lambda

Ka anyị mepụta ọrụ lambda akpọrọ Collector, onye ọrụ ya ga-abụ ntuli aka iyi ụgbọ elu_tiketi na, ọ bụrụ na achọta ndekọ ọhụrụ n'ebe ahụ, tinye ndekọ ndị a na tebụl DynamoDB. N'ụzọ doro anya, na mgbakwunye na ikike ndabara, lambda a ga-enwerịrị ịgụ ohere na iyi data Kinesis wee dee ohere na DynamoDB.

Ịmepụta ọrụ IAM maka ọrụ onye na-anakọta lambda
Nke mbụ, ka anyị mepụta ọrụ IAM ọhụrụ maka lambda aha ya bụ Lambda-TicketsProcessingRole:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Maka ọmụmaatụ nnwale, AmazonKinesisReadOnlyAccess ahazigoro na AmazonDynamoDBFullAccess atumatu dabara adaba, dị ka egosiri na foto dị n'okpuru:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

Ekwesịrị ịmalite lambda a site na mkpalite sitere na Kinesis mgbe ndenye ọhụrụ na-abanye n'ụgbọelu_stream, yabụ anyị kwesịrị itinye ihe mkpalite ọhụrụ:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Naanị ihe fọdụrụ bụ itinye koodu ma chekwaa lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ịmepụta ihe ngosi ọrụ lambda

Ọrụ lambda nke abụọ, nke ga-enyocha iyi nke abụọ (special_stream) wee ziga ọkwa na SNS, n'otu aka ahụ. Ya mere, lambda a ga-enwerịrị ike ịgụ site na Kinesis wee ziga ozi na isiokwu SNS nyere, nke ọrụ SNS ga-ezigara ndị niile debanyere aha isiokwu a (email, SMS, wdg).

Ịmepụta ọrụ IAM
Nke mbụ, anyị mepụtara ọrụ IAM Lambda-KinesisAlarm maka lambda a, wee kenye ọrụ a na alarm_notifier lambda a na-emepụta:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

Nke a lambda kwesịrị ịrụ ọrụ na mkpalite maka ndekọ ọhụrụ ịbanye na special_stream, yabụ ịkwesịrị ịhazi ihe mkpalite ahụ n'otu ụzọ ahụ anyị mere maka mkpokọta mkpokọta.

Iji mee ka ọ dị mfe ịhazi lambda a, ka anyị webata mgbanwe gburugburu ebe obibi ọhụrụ - TOPIC_ARN, ebe anyị debere ANR (Amazon Recourse Names) nke isiokwu ụgbọ elu:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ma tinye koodu lambda, ọ bụghị mgbagwoju anya ma ọlị:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Ọ dị ka ebe a ka emechara nhazi usoro ntuziaka. Naanị ihe fọdụrụ bụ ịnwale ma hụ na anyị ahazila ihe niile nke ọma.

Bugharịa site na koodu Terraform

Nkwadebe achọrọ

Terraform bụ ngwá ọrụ mepere emepe nke ukwuu maka ibuga akụrụngwa sitere na koodu. O nwere syntax nke ya nke dị mfe ịmụta ma nwee ọtụtụ ihe atụ nke otu na ihe a ga-etinye. Onye nchịkọta akụkọ Atom ma ọ bụ Visual Studio Code nwere ọtụtụ ngwa mgbakwunye na-eme ka ịrụ ọrụ na Terraform dị mfe.

Ị nwere ike ibudata nkesa site n'ebe a. Nyocha zuru ezu nke ikike Terraform niile karịrị nke isiokwu a, yabụ anyị ga-ejedebe onwe anyị na isi ihe.

Otu esi amalite

Koodu zuru oke nke oru ngo bu n'ime ebe nchekwa m. Anyị na-ekpuchi ebe nchekwa ahụ n'onwe anyị. Tupu ịmalite, ịkwesịrị ijide n'aka na ị nwere AWS CLI arụnyere ma hazie ya, n'ihi na ... Terraform ga-achọ nzere na faịlụ ~/.aws/credentials.

Omume dị mma bụ ịgba ọsọ iwu atụmatụ tupu ibuga akụrụngwa niile iji hụ ihe Terraform na-ekepụta anyị ugbu a na igwe ojii:

terraform.exe plan

A ga-akpali gị itinye nọmba ekwentị iji zipu ọkwa. Ọ dịghị mkpa ịbanye na ya n'oge a.

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
N'ịtụlela atụmatụ ọrụ mmemme ahụ, anyị nwere ike ịmalite ịmepụta akụrụngwa:

terraform.exe apply

Mgbe izipu iwu a, a ga-ajụ gị ọzọ ka itinye nọmba ekwentị; pịa “ee” mgbe egosiri ajụjụ gbasara ime ihe ndị a. Nke a ga-enye gị ohere ịmepụta akụrụngwa niile, rụọ ọrụ nhazi niile dị mkpa nke EC2, tinye ọrụ lambda, wdg.

Mgbe emechara ihe niile nke ọma site na koodu Terraform, ịkwesịrị ịbanye na nkọwa nke ngwa Kinesis Analytics (ọ dabara nke ọma, ahụghị m otu esi eme nke a ozugbo na koodu).

Malite ngwa:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Mgbe nke a gachara, ị ga-edobe aha iyi iyi nke ngwa n'ụzọ doro anya site na ịhọrọ site na listi ndọpụta:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ugbu a ihe niile dị njikere ịga.

Na-anwale ngwa ahụ

N'agbanyeghị otu esi etinye sistemu ahụ, iji aka ma ọ bụ site na koodu Terraform, ọ ga-arụ ọrụ otu ihe ahụ.

Anyị na-abanye site na SSH na igwe mebere EC2 ebe arụnyere Kinesis Agent wee mee edemede api_caller.py

sudo ./api_caller.py TOKEN

Naanị ihe ị ga - eme bụ ichere SMS na nọmba gị:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
SMS - ozi na-abịa na ekwentị gị n'ihe fọrọ nke nta ka ọ bụrụ otu nkeji:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa
Ọ ka ga-ahụ ma echekwara ndekọ ahụ na nchekwa data DynamoDB maka nyocha na-esote, nkọwa zuru oke. Tebụlụ tiketi ụgbọ elu nwere ihe dị ka data ndị a:

Njikọ Aviasales API na Amazon Kinesis na ịdị mfe enweghị nkesa

nkwubi

N'ime ọrụ a rụrụ, e wuru usoro nhazi data ntanetị dabere na Amazon Kinesis. Nhọrọ maka iji Kinesis Agent na njikọ Kinesis Data Streams na ezigbo nyocha Kinesis Analytics site na iji iwu SQL, yana mmekọrịta nke Amazon Kinesis na ọrụ AWS ndị ọzọ tụlere.

Anyị bufere usoro dị n'elu n'ụzọ abụọ: akwụkwọ ntuziaka dị ogologo na nke dị ngwa site na koodu Terraform.

Koodu isi mmalite ọrụ niile dị na ebe nchekwa GitHub m, M na-atụ aro ka ị mara onwe gị na ya.

Obi dị m ụtọ ịkọrọ akụkọ ahụ, ana m atụ anya ihe ị ga-ekwu. Enwere m olileanya maka nkatọ bara uru.

Achọrọ m gị ịga nke ọma!

isi: www.habr.com

Tinye a comment