Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

Hey Habr!

O rata lifofane tse fofang? Kea e rata, empa nakong ea ho itšehla thajana ke ile ka boela ka rata ho hlahloba lintlha tsa litekete tsa moea tse tsoang mohloling o mong o tsebahalang - Aviasales.

Kajeno re tla sekaseka mosebetsi oa Amazon Kinesis, ho haha ​​​​tsamaiso ea ho phallela ka li-analytics tsa nako ea sebele, ho kenya database ea Amazon DynamoDB NoSQL e le polokelo ea data e ka sehloohong, le ho theha litsebiso tsa SMS bakeng sa litekete tse thahasellisang.

Lintlha tsohle li tlas'a sehiloeng! Tsamaea!

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

Selelekela

Ka mohlala, re hloka ho fihlella Aviasales API. Ho fihlella ho eona ho fanoa mahala le ntle le lithibelo; o hloka feela ho ingolisa karolong ea "Baetsi" ho amohela token ea hau ea API ho fihlella data.

Morero o ka sehloohong oa sengoloa sena ke ho fana ka kutloisiso e akaretsang ea tšebeliso ea phallo ea tlhahisoleseling ho AWS; re ela hloko hore data e khutlisitsoeng ke API e sebelisitsoeng ha e ea morao-rao ebile e fetisoa ho tsoa ho cache, e leng e thehiloe ho latela lipatlisiso tsa basebelisi ba libaka tsa Aviasales.ru le Jetradar.com lihora tse 48 tse fetileng.

Moemeli oa Kinesis, o kentsoeng mochine o hlahisang, o amoheloang ka API o tla itlhalosa le ho fetisetsa data ho molapo o lakatsehang ka Kinesis Data Analytics. Mofuta ona oa ho bala o tla ngolloa lebenkeleng ka kotloloho. Polokelo ea data e tala e kentsoeng DynamoDB e tla lumella tlhahlobo e tebileng ea litekete ka lisebelisoa tsa BI, joalo ka AWS Quick Sight.

Re tla nahana ka likhetho tse peli tsa ho tsamaisa thepa eohle ea motheo:

  • Bukana - ka AWS Management Console;
  • Lisebelisoa tse tsoang ho khoutu ea Terraform ke tsa li-automator tse botsoa;

Mehaho ea tsamaiso e tsoetseng pele

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Likarolo tse sebelisitsoeng:

  • Aviasales API - data e khutlisitsoeng ke API ena e tla sebelisoa bakeng sa mesebetsi eohle e latelang;
  • EC2 Mohlala oa Moetsi - mochini o tloaelehileng o lerung oo phallo ea data e kenang e tla hlahisoa ho ona:
    • Moemeli oa Kinesis ke sesebelisoa sa Java se kentsoeng sebakeng sa heno mochining o fanang ka mokhoa o bonolo oa ho bokella le ho romella data ho Kinesis (Kinesis Data Streams kapa Kinesis Firehose). Moemeli o lula a beha leihlo sete ea lifaele ho li-directory tse boletsoeng ebe o romela data e ncha ho Kinesis;
    • API Caller Script - Sengoloa sa Python se etsang likopo ho API mme se beha karabo ka har'a foldara e shebiloeng ke Moemeli oa Kinesis;
  • Melapo ea data ea Kinesis - Ts'ebeletso ea ho tsamaisa data ea nako ea nnete e nang le bokhoni bo pharalletseng;
  • Kinesis Analytics ke ts'ebeletso e se nang seva e nolofatsang tlhahlobo ea data ea ho phallela ka nako ea nnete. Amazon Kinesis Data Analytics e lokisa lisebelisoa tsa kopo le sekala se ikemetseng ho sebetsana le palo efe kapa efe ea data e kenang;
  • AWS Lambda — tshebeletso e o dumellang ho tsamaisa khoutu ntle le ho bekapo kapa ho seta diseva. Matla ohle a komporo a lekantsoe ka bohona bakeng sa mohala o mong le o mong;
  • Amazon DynamoDB - Lethathamo la li-key-value pairs le litokomane tse fanang ka latency e ka tlase ho 10 milliseconds ha e sebetsa ka tekanyo efe kapa efe. Ha u sebelisa DynamoDB, ha ho hlokahale hore u fane, u lokise, kapa u tsamaise li-server leha e le life. DynamoDB e lekanya litafole ka bo eona ho fetola palo ea lisebelisoa tse teng le ho boloka ts'ebetso e phahameng. Ha ho tsamaiso ea tsamaiso e hlokahalang;
  • Amazon SNS - ts'ebeletso e laoloang ka botlalo bakeng sa ho romella melaetsa o sebelisa mohlala oa mohatisi (Pub/Sub), oo ka oona o ka arolang li-microservices, litsamaiso tse ajoang le lits'ebetso tse se nang seva. SNS e ka sebelisoa ho romella tlhahisoleseling ho basebelisi ba ho qetela ka litsebiso tsa mobile push, melaetsa ea SMS le mangolo-tsoibila.

Koetliso ea pele

Ho etsisa phallo ea data, ke nkile qeto ea ho sebelisa tlhaiso-leseling ea tekete ea sefofane e khutlisitsoeng ke Aviasales API. IN litokomane lethathamo le pharaletseng la mekhoa e fapaneng, ha re nke e 'ngoe ea tsona - "Khalendara ea Theko ea Khoeli", e khutlisetsang litheko tsa letsatsi le leng le le leng la khoeli, li arotsoe ka palo ea phetisetso. Haeba o sa hlakise hore na o batla khoeli efe kopong, lintlha li tla khutlisoa bakeng sa khoeli e latelang ea hona joale.

Kahoo, a re ingoliseng 'me re fumane letšoao la rona.

Mohlala oa kopo o ka tlase:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Mokhoa o ka holimo oa ho fumana data ho API ka ho hlakisa letšoao ho kopo o tla sebetsa, empa ke khetha ho fetisa letšoao la ho fihlella ka hlooho, kahoo re tla sebelisa mokhoa ona ho api_caller.py script.

Mohlala oa karabo:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Mohlala oa karabo ea API ka holimo e bontša tekete e tsoang St. Petersburg ho ea Phuk ... Oh, ke toro e kakang ...
Kaha ke tsoa Kazan, 'me Phuket e se e le "toro feela", a re batle litekete tse tsoang St. Petersburg ho ea Kazan.

E nka hore u se u ntse u e-na le ak'haonte ea AWS. Ke kopa hang-hang ho lebisa tlhokomelo e khethehileng tabeng ea hore Kinesis le ho romela litsebiso ka SMS ha li kenyelelitsoe selemo le selemo. Mahala Tier (ts'ebeliso ea mahala). Empa leha ho le joalo, ka lidolara tse 'maloa kelellong, hoa khoneha ho aha sistimi e reriloeng le ho bapala ka eona. 'Me, ha e le hantle, u se ke ua lebala ho hlakola lisebelisoa tsohle ka mor'a hore li se ke tsa hlola li hlokahala.

Ka lehlohonolo, DynamoDb le mesebetsi ea lambda e tla ba mahala bakeng sa rona haeba re ka fihlela meeli ea mahala ea khoeli le khoeli. Mohlala, bakeng sa DynamoDB: 25 GB ea polokelo, 25 WCU/RCU le lipotso tse limilione tse 100. Le lits'ebetso tsa lambda tse milione ka khoeli.

Tsamaiso ea tsamaiso ka letsoho

Ho theha Melapo ea data ea Kinesis

Ha re ee ho ts'ebeletso ea Kinesis Data Streams mme re thehe melapo e mecha e 'meli, shard e le 'ngoe bakeng sa e' ngoe le e 'ngoe.

Shard ke eng?
Shard ke karolo ea mantlha ea phetisetso ea data ea molapo oa Amazon Kinesis. Karolo e 'ngoe e fana ka phetisetso ea data e kenang ka lebelo la 1 MB/s le phetiso ea data e hlahisoang ka lebelo la 2 MB/s. Karolo e le 'ngoe e ts'ehetsa ho fihla ho 1000 PUT ka motsotsoana. Ha o theha phallo ea data, o hloka ho hlakisa palo e hlokahalang ea likarolo. Ka mohlala, o ka etsa phallo ea data ka likarolo tse peli. Phallo ena ea data e tla fana ka phetisetso ea data e kenang ho 2 MB / s le phetiso ea data e hlahisoang ho 4 MB / s, e ts'ehetsa lirekoto tsa 2000 PUT motsotsoana.

Ha lithapo li ntse li le ngata molapong oa hau, ts'ebetso ea eona e kholoanyane. Ha e le hantle, ke tsela eo phallo e lekantsoeng ka eona - ka ho eketsa li-shards. Empa ha u e-na le li-shards tse ngata, theko e phahameng. Shard ka 'ngoe e bitsa lisente tse 1,5 ka hora le lisente tse ling tse 1.4 bakeng sa milione e 'ngoe le e 'ngoe ea litefiso tsa PUT.

Ha re theheng molatsoana o mocha o nang le lebitso litekete_tsa sefofane, 1 shard e tla mo lekana:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Joale ha re theheng khoele e 'ngoe e nang le lebitso special_stream:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

Tlhophiso ea moetsi

Ho sekaseka mosebetsi, ho lekane ho sebelisa mohlala o tloaelehileng oa EC2 joalo ka mohlahisi oa data. Ha ea tlameha ho ba mochini o matla, o theko e phahameng haholo; sebaka t2.micro se tla sebetsa hantle.

Tlhokomeliso ea bohlokoa: mohlala, u lokela ho sebelisa setšoantšo - Amazon Linux AMI 2018.03.0, e na le litlhophiso tse fokolang tsa ho qala ka potlako Moemeli oa Kinesis.

E-ea ho ts'ebeletso ea EC2, theha mochine o mocha oa sebele, khetha AMI e lakatsehang ka mofuta oa t2.micro, e kenyellelitsoeng ho Free Tier:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
E le hore mochine oa sebele o sa tsoa thehoa o khone ho sebelisana le tšebeletso ea Kinesis, o tlameha ho fuoa litokelo tsa ho etsa joalo. Mokhoa o motle oa ho etsa sena ke ho abela Karolo ea IAM. Ka hona, ho Mohato oa 3: Lokisa lintlha tsa Instance skrineng, u lokela ho khetha Theha Karolo e ncha ea IAM:

Ho theha karolo ea IAM bakeng sa EC2
Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Fesetereng e butsoeng, khetha hore re theha karolo e ncha bakeng sa EC2 ebe u ea karolong ea Litumello:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Ka ho sebelisa mohlala oa koetliso, ha ho hlokahale hore re kene ka har'a mathata ohle a tlhophiso ea granular ea litokelo tsa lisebelisoa, kahoo re tla khetha maano a hlophisitsoeng esale pele ke Amazon: AmazonKinesisFullAccess le CloudWatchFullAccess.

Ha re faneng ka lebitso le nang le moelelo bakeng sa karolo ena, mohlala: EC2-KinesisStreams-FullAccess. Sephetho se lokela ho tšoana le se bontšitsoeng setšoantšong se ka tlase:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Kamora ho theha karolo ena e ncha, u se ke oa lebala ho e hokahanya le mohlala oa mochini o hlophisitsoeng:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Ha re fetole letho skrineng sena ebe re fetela ho tse latelang lifensetere.

Litlhophiso tsa hard drive li ka siuoa e le tsa kamehla, hammoho le li-tag (le hoja e le tloaelo e ntle ho sebelisa li-tag, bonyane fana ka mohlala lebitso le ho bontša tikoloho).

Hona joale re ho Mohato oa 6: Lokisa tab ea Sehlopha sa Tšireletso, moo u hlokang ho theha e ncha kapa ho bolela sehlopha sa hau sa Tšireletso se teng, se u lumellang hore u hokahane ka ssh (port 22) ho mohlala. Khetha Mohloli -> IP ea ka moo 'me u ka qala mohlala.

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Hang ha e fetohela ho boemo bo sebetsang, o ka leka ho hokela ho eona ka ssh.

Ho khona ho sebetsa le Kinesis Agent, kamora ho hokela mochini ka katleho, o tlameha ho kenya litaelo tse latelang ho terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ha re theheng sephutheli ho boloka likarabo tsa API:

sudo mkdir /var/log/airline_tickets

Pele o qala moemeli, o hloka ho e hlophisa:

sudo vim /etc/aws-kinesis/agent.json

Litaba tsa faele ea agent.json li lokela ho shebahala tjena:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Joalokaha ho ka bonoa ho tsoa faeleng ea tlhophiso, moemeli o tla beha leihlo lifaele ka .log extension ho /var/log/airline_tickets/ directory, a li hlalose ebe o li fetisetsa ho airline_tickets stream.

Re qala ts'ebeletso hape mme re etsa bonnete ba hore e ntse e sebetsa:

sudo service aws-kinesis-agent restart

Joale ha re khoasolle sengoloa sa Python se tla kopa data ho API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Sengoloa sa api_caller.py se kopa data ho tsoa ho Aviasales mme se boloka karabo e amohetsoeng bukeng eo moemeli oa Kinesis a e hlahlobang. Ts'ebetsong ea script ena e tloaelehile, ho na le sehlopha sa TicketsApi, se u lumella ho hula API ka mokhoa o sa tsitsang. Re fetisa hlooho e nang le lets'oao mme re kopa liparamente ho sehlopha sena:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Ho leka litlhophiso tse nepahetseng le tšebetso ea moemeli, ha re lekeng ho sebelisa mongolo oa api_caller.py:

sudo ./api_caller.py TOKEN

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
'Me re sheba sephetho sa mosebetsi ho li-log tsa Moemeli le ho tab ea Tlhokomelo sebakeng sa data sa airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Joalokaha u ka bona, ntho e 'ngoe le e' ngoe e sebetsa 'me Moemeli oa Kinesis o atlehile ho romela data ho molapo. Joale a re lokiseng bareki.

Ho theha Kinesis Data Analytics

Ha re feteleng pele ho karolo e bohareng ea sistimi eohle - theha sesebelisoa se secha ho Kinesis Data Analytics e bitsoang kinesis_analytics_airlines_app:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Kinesis Data Analytics e u lumella ho etsa tlhahlobo ea data ea nako ea nnete ho tsoa ho Kinesis Streams u sebelisa puo ea SQL. Ke ts'ebeletso ea autoscaling ka botlalo (ho fapana le Kinesis Streams) eo:

  1. e u lumella ho theha melapo e mecha (Output Stream) ho latela likopo tsa ho fumana data;
  2. e fana ka molapo o nang le liphoso tse etsahetseng ha lits'ebetso li ntse li sebetsa (Error Stream);
  3. e ka ikhethela mokhoa oa ho kenya data (e ka hlalosoa hape ka letsoho haeba ho hlokahala).

Ena hase tšebeletso e theko e tlaase - 0.11 USD ka hora ea mosebetsi, kahoo u lokela ho e sebelisa ka hloko 'me ue tlose ha u qetile.

Ha re hokelang sesebelisoa mohloling oa data:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Khetha tsela eo re tla hokela ho eona (airline_tickets):

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
E latelang, o hloka ho hokela Karolo e ncha ea IAM e le hore sesebelisoa se ka bala ho tsoa molapong ebe se ngolla molapong. Ho etsa sena, ho lekane hore u se ke ua fetola letho ho thibela litumello tsa Access:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Joale ha re kope ho sibolloa ha schema ea data molapong; ho etsa sena, tobetsa konopo ea "Discover schema". Ka lebaka leo, karolo ea IAM e tla ntlafatsoa (e ncha e tla etsoa) 'me ho tla hlahisoa schema ho tsoa ho data e seng e fihlile molapong:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Joale o hloka ho ea ho mohlophisi oa SQL. Ha o tobetsa konopo ena, fensetere e tla hlaha e u botsang hore u thakhole sesebelisoa - khetha seo u batlang ho se qala:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Kenya potso e bonolo e latelang fensetereng ea mohlophisi oa SQL ebe o tobetsa Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Ka har'a marang-rang a amanang, u sebetsa le litafole u sebelisa lipolelo tsa INSERT ho eketsa litlaleho le polelo ea KHETHA ho botsa data. Ho Amazon Kinesis Data Analytics, u sebetsa le melapo (STREAMs) le lipompo (PUMPs) - likōpo tse tsoelang pele tse kenyang data ho tloha molapong o mong ka kopo ho ea molapong o mong.

Potso ea SQL e hlahisitsoeng ka holimo e batla litekete tsa Aeroflot ka theko e ka tlase ho li-ruble tse likete tse hlano. Lirekoto tsohle tse fihlelang lipehelo tsena li tla behoa sebakeng sa DESTINATION_SQL_STREAM.

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Sebakeng seo U eang ho sona, khetha special_stream stream, 'me ho lebitso la In-application stream DESTINATION_SQL_STREAM lenane le theoha:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Sephetho sa maqheka ohle e lokela ho ba ntho e ts'oanang le setšoantšo se ka tlase:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

Ho theha le ho ingolisa ho sehlooho sa SNS

Eya ho Tšebeletso ea Tsebiso e Bonolo 'me u thehe sehlooho se secha moo se nang le lebitso Airlines:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Ingolise ho sehlooho sena 'me u bontše nomoro ea mohala eo litsebiso tsa SMS li tla romelloa ho eona:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

Theha tafole ho DynamoDB

Ho boloka lintlha tse tsoang ho li-airline_tickets tsa bona, ha re theheng tafole ho DynamoDB e nang le lebitso le tšoanang. Re tla sebelisa record_id joalo ka senotlolo sa mantlha:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

Ho theha pokello ea mesebetsi ea lambda

Ha re theheng mosebetsi oa lambda o bitsoang Collector, eo mosebetsi oa hae e tla ba ho khetha li-airline_tickets molapo, 'me haeba ho fumanoa lirekoto tse ncha moo, kenya lirekoto tsena tafoleng ea DynamoDB. Ho hlakile hore, ntle le litokelo tsa kamehla, lambda ena e tlameha ho ba le phihlello ea ho bala ho molapo oa data oa Kinesis le phihlello ea ho ngola ho DynamoDB.

Ho theha karolo ea IAM bakeng sa mosebetsi oa pokello ea lambda
Taba ea pele, ha re theheng karolo e ncha ea IAM bakeng sa lambda e bitsoang Lambda-TicketsProcessingRole:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Bakeng sa mohlala oa teko, melaoana e hlophisitsoeng esale pele ea AmazonKinesisReadOnlyAccess le AmazonDynamoDBFullAccess e nepahetse, joalo ka ha ho bonts'itsoe setšoantšong se ka tlase:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

Lambda ena e lokela ho hlahisoa ke "trigger" e tsoang ho Kinesis ha likenyelletso tse ncha li kena airline_stream, ka hona re hloka ho kenyelletsa trigger e ncha:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Sohle se setseng ke ho kenya khoutu le ho boloka lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ho theha tsebiso ea ts'ebetso ea lambda

Mosebetsi oa bobeli oa lambda, o tla shebella molapo oa bobeli (special_stream) le ho romela tsebiso ho SNS, o entsoe ka tsela e ts'oanang. Ka hona, lambda ena e tlameha ho ba le monyetla oa ho bala ho tsoa ho Kinesis le ho romella melaetsa ho sehlooho se fanoeng sa SNS, se tla romelloa ke ts'ebeletso ea SNS ho bohle ba ngolisitseng sehlooho sena (imeile, SMS, joalo-joalo).

Ho theha karolo ea IAM
Taba ea mantlha, re theha karolo ea IAM Lambda-KinesisAlarm bakeng sa lambda ena, ebe re abela karolo ena ho alarm_notifier lambda e ntseng e etsoa:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

Lambda ena e lokela ho sebetsa ho trigger bakeng sa lirekoto tse ncha ho kenya special_stream, kahoo o hloka ho lokisa trigger ka tsela e tšoanang le eo re e entseng bakeng sa Collector lambda.

Ho etsa hore ho be bonolo ho lokisa lambda ena, a re ke re tsebiseng mofuta o mocha oa tikoloho - TOPIC_ARN, moo re behang ANR (Mabitso a Recourse a Amazon) a sehlooho sa Lifofane:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Mme kenya khoutu ea lambda, ha e thata ho hang:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Ho bonahala eka ke hona moo tlhophiso ea tsamaiso ea matsoho e phethetsoeng. Ho setseng ke ho leka le ho etsa bonnete ba hore re hlophisitse tsohle ka nepo.

Tsamaisa ho tsoa ho khoutu ea Terraform

Tokisetso e hlokahalang

Terraform ke sesebelisoa se bulehileng sa mohloli o bulehileng oa ho tsamaisa lisebelisoa ho tsoa ho khoutu. E na le syntax ea eona eo ho leng bonolo ho ithuta eona 'me e na le mehlala e mengata ea hore na e ka tsamaisoa joang le hore na e sebelisoa joang. Mohlophisi oa Atom kapa Visual Studio Code e na le li-plugins tse ngata tse sebetsang tse etsang hore ho be bonolo ho sebetsa le Terraform.

U ka khoasolla kabo ho tloha mona. Tlhahlobo e qaqileng ea bokhoni bohle ba Terraform e ka nqane ho sengoloa sena, ka hona re tla ipehela lintlha tsa mantlha.

U ka qala joang

Khoutu e felletseng ea morero ke polokelong ea ka. Re iketsetsa polokelo ho rona. Pele o qala, o hloka ho etsa bonnete ba hore o kentse le ho hlophisoa AWS CLI, hobane ... Terraform e tla batla lintlha ho ~/.aws/credentials file.

Mokhoa o motle ke ho tsamaisa taelo ea moralo pele o sebelisa lisebelisoa tsohle ho bona hore na Terraform e ntse e re etsetsa eng hona joale marung:

terraform.exe plan

U tla kopuoa ho kenya nomoro ea mohala eo u ka romellang litsebiso ho eona. Ha ho hlokahale ho e kenya sethaleng sena.

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Ha re se re hlahlobile moralo oa ts'ebetso ea lenaneo, re ka qala ho theha lisebelisoa:

terraform.exe apply

Kamora ho romella taelo ena, o tla botsoa hape ho kenya nomoro ea mohala; letsetsa "e" ha potso e mabapi le ho etsa liketso e hlaha. Sena se tla u lumella ho theha lits'ebetso tsohle, ho etsa tlhophiso eohle e hlokahalang ea EC2, ho tsamaisa mesebetsi ea lambda, jj.

Ka mor'a hore lisebelisoa tsohle li bōptjoe ka katleho ka khoutu ea Terraform, u lokela ho kena lintlheng tsa kopo ea Kinesis Analytics (ka bomalimabe, ha kea fumana mokhoa oa ho etsa sena ka ho toba ho tloha khoutu).

Qala kopo:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Kamora sena, o tlameha ho seta ka ho hlaka lebitso la molatsoana oa kopo ka ho khetha lethathamong le theoha:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
Joale tsohle li loketse ho tsamaea.

Ho hlahloba kopo

Ho sa tsotelehe hore na o sebelisitse sistimi joang, ka letsoho kapa ka khoutu ea Terraform, e tla sebetsa ka mokhoa o ts'oanang.

Re kena ka SSH ho mochini o sebetsang oa EC2 moo Kinesis Agent e kentsoeng le ho tsamaisa mongolo oa api_caller.py

sudo ./api_caller.py TOKEN

Seo u hlokang ho se etsa feela ke ho emela SMS ho nomoro ea hau:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
SMS - molaetsa o fihla fonong hoo e ka bang motsotso o le mong:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva
E ntse e le ho bona hore na lirekoto li bolokiloe polokelong ea database ea DynamoDB bakeng sa tlhahlobo e latelang, e qaqileng haholoanyane. The airline_tickets tafole e na le lintlha tse latelang:

Khokahano ea Aviasales API le Amazon Kinesis le bonolo bo se nang seva

fihlela qeto e

Nakong ea mosebetsi o entsoeng, ho ile ha hahoa tsamaiso ea marang-rang ea data e thehiloeng ho Amazon Kinesis. Likhetho tsa ho sebelisa Moemeli oa Kinesis hammoho le Kinesis Data Streams le analytics ea nako ea sebele Kinesis Analytics ho sebelisa litaelo tsa SQL, hammoho le ho sebelisana ha Amazon Kinesis le litšebeletso tse ling tsa AWS li ne li nkoa.

Re sebelisitse sistimi e kaholimo ka mekhoa e 'meli: buka e telele e telele le e potlakileng ho tsoa ho khoutu ea Terraform.

Khoutu eohle ea mohloli oa projeke e teng sebakeng sa ka sa polokelo sa GitHub, ke fana ka tlhahiso ea hore u itloaetse eona.

Ke thabela ho buisana ka sehlooho sena, ke lebeletse ka thabo litlhaloso tsa hau. Ke tšepa ho nyatsoa ka mokhoa o hahang.

Ke u lakaletsa katleho!

Source: www.habr.com

Eketsa ka tlhaloso