Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

Hey Habr!

Tianao ve ny fiaramanidina manidina? Tiako izany, saingy nandritra ny fitokanan-tena dia nanjary tia ny famakafakana angon-drakitra momba ny tapakila fiaramanidina avy amin'ny loharano iray malaza - Aviasales.

Androany isika dia handinika ny asan'ny Amazon Kinesis, hanangana rafitra streaming miaraka amin'ny famakafakana amin'ny fotoana tena izy, hametraka ny tahiry Amazon DynamoDB NoSQL ho fitahirizana angon-drakitra lehibe, ary hametraka fampandrenesana SMS ho an'ny tapakila mahaliana.

Ny antsipiriany rehetra dia eo ambany fanapahana! Mandehana!

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

fampidirana

Ohatra, mila fidirana amin'ny Aviasales API. Ny fidirana amin'izany dia omena maimaim-poana ary tsy misy fameperana; mila misoratra anarana ao amin'ny fizarana "Developers" fotsiny ianao mba hahazoana ny mari-pamantarana API anao hidirana amin'ny angona.

Ny tanjon'ity lahatsoratra ity dia ny hanome fahatakarana ankapobeny ny fampiasana ny fampitana vaovao amin'ny AWS; raisinay fa ny angon-drakitra naverin'ny API ampiasaina dia tsy tena manara-penitra ary ampitaina avy amin'ny cache, izay noforonina mifototra amin'ny fikarohana nataon'ireo mpampiasa ny tranokala Aviasales.ru sy Jetradar.com nandritra ny 48 ora farany.

Kinesis-agent, apetraka amin'ny milina mpamokatra, voaray amin'ny alàlan'ny API dia hamakivaky ho azy sy handefa ny angona amin'ny renirano tiana amin'ny alàlan'ny Kinesis Data Analytics. Ny dikan-teny manta amin'ity stream ity dia hosoratana mivantana any amin'ny fivarotana. Ny fitahirizana angon-drakitra manta napetraka ao amin'ny DynamoDB dia ahafahana manadihady tapakila lalindalina kokoa amin'ny alàlan'ny fitaovana BI, toy ny AWS Quick Sight.

Handinika safidy roa amin'ny fametrahana ny fotodrafitrasa manontolo isika:

  • Manual - amin'ny alàlan'ny AWS Management Console;
  • Ny fotodrafitrasa avy amin'ny code Terraform dia ho an'ny automatique kamo;

Architecture ny rafitra novolavolaina

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ireo singa ampiasaina:

  • Aviasales API - ny angona naverin'ity API ity dia hampiasaina amin'ny asa manaraka rehetra;
  • Ohatra mpamokatra EC2 - milina virtoaly mahazatra ao amin'ny rahona izay hamoahana ny stream data fampidirana:
    • Kinesis Agent dia fampiharana Java napetraka eo an-toerana amin'ny milina izay manome fomba mora hanangonana sy handefasana data amin'ny Kinesis (Kinesis Data Streams na Kinesis Firehose). Manara-maso tsy tapaka ny andian-drakitra ao amin'ny lahatahiry voatondro ny mpiasa ary mandefa angona vaovao amin'ny Kinesis;
    • API Caller Script - script Python izay manao fangatahana amin'ny API ary mametraka ny valiny ao anaty lahatahiry izay arahin'ny Kinesis Agent;
  • Kinesis Data Streams - Serivisy fandefasana data amin'ny fotoana tena izy miaraka amin'ny fahaiza-manao scaling midadasika;
  • Kinesis Analytics dia serivisy tsy misy mpizara izay manatsotra ny famakafakana ny angon-drakitra mivantana amin'ny fotoana tena izy. Ny Amazon Kinesis Data Analytics dia manitsy ny loharanon'ny rindranasa ary mizana ho azy mba hikarakarana ny habetsaky ny angona miditra;
  • Zanak'i Lambda — serivisy ahafahanao mampandeha kaody tsy misy backup na fametrahana lohamilina. Ny herin'ny informatika rehetra dia ahena ho azy isaky ny antso;
  • Amazon DynamoDB - angon-drakitra misy tsiroaroa manan-danja sy antontan-taratasy izay manome fahatarana latsaky ny 10 milisegondra rehefa mandeha amin'ny ambaratonga rehetra. Rehefa mampiasa DynamoDB ianao dia tsy mila manome, mametaka, na mitantana mpizara. Ny DynamoDB dia manitsy ny latabatra ho azy mba hanitsiana ny habetsahan'ny loharanon-karena misy sy hitazonana ny fahombiazany. Tsy ilaina ny fitantanana rafitra;
  • Amazon SNS - serivisy tantanina tanteraka amin'ny fandefasana hafatra amin'ny fampiasana ny maodely mpamoaka lahatsoratra-mpanoratra (Pub/Sub), izay ahafahanao mitoka-monina microservices, rafitra zaraina ary fampiharana tsy misy mpizara. Ny SNS dia azo ampiasaina handefasana vaovao amin'ireo mpampiasa farany amin'ny alàlan'ny fampandrenesana fanerena finday, hafatra SMS ary mailaka.

Fiofanana voalohany

Mba haka tahaka ny fikorianan'ny angona dia nanapa-kevitra ny hampiasa ny fampahalalana momba ny tapakila fiaramanidina naverin'ny Aviasales API aho. IN tahirin-kevitra lisitra be dia be amin'ny fomba isan-karazany, andao haka ny iray amin'izy ireo - "Kalandrie vidin'ny volana", izay mamerina ny vidin'ny andro tsirairay amin'ny volana, natambatra amin'ny isan'ny famindrana. Raha tsy mamaritra ny volana fikarohana ao amin'ny fangatahana ianao, dia haverina amin'ny volana manaraka ny iray ankehitriny ny fampahalalana.

Noho izany, andao hisoratra anarana ary hahazo ny mari-pamantaranay.

Ireto misy ohatra iray fangatahana:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ny fomba etsy ambony amin'ny fandraisana angona avy amin'ny API amin'ny alàlan'ny fanondroana marika ao amin'ny fangatahana dia hiasa, fa aleoko mandalo ny famantarana fidirana amin'ny lohapejy, noho izany dia hampiasa ity fomba ity amin'ny script api_caller.py izahay.

Valiny ohatra:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Ny valin'ny API ohatra etsy ambony dia mampiseho tapakila avy any Saint-Pétersbourg mankany Phuk... Oh, nofinofy toy inona...
Koa satria avy any Kazan aho, ary "nofy fotsiny" izao i Phuket, andao hitady tapakila avy any Saint-Pétersbourg mankany Kazan.

Heverina fa efa manana kaonty AWS ianao. Te-hisarika ny saina manokana avy hatrany aho fa tsy tafiditra ao anatin'ny isan-taona ny Kinesis sy ny fandefasana fampandrenesana amin'ny alàlan'ny SMS Tier maimaim-poana (fampiasana maimaim-poana). Saingy na dia eo aza izany, miaraka amin'ny dolara roa ao an-tsaina, dia azo atao tsara ny manangana ny rafitra natolotra ary milalao miaraka aminy. Ary, mazava ho azy, aza adino ny mamafa ny loharano rehetra rehefa tsy ilaina intsony.

Soa ihany fa maimaim-poana ho antsika ny fiasa DynamoDb sy lambda raha mahafeno ny fetra maimaim-poana isam-bolana. Ohatra, ho an'ny DynamoDB: fitahirizana 25 GB, 25 WCU/RCU ary fanontaniana 100 tapitrisa. Ary antso lambda iray tapitrisa isam-bolana.

Fametrahana rafitra manual

Mametraka Kinesis Data Streams

Andao ho any amin'ny serivisy Kinesis Data Streams ary mamorona renirano vaovao roa, shard iray ho an'ny tsirairay.

Inona no atao hoe shard?
Ny shard dia singa fototra famindrana angon-drakitra amin'ny renirano Amazon Kinesis. Ny fizarana iray dia manome famindrana angona fidirana amin'ny hafainganam-pandeha 1 MB/s ary famindrana angon-drakitra mivoaka amin'ny hafainganam-pandeha 2 MB/s. Ny fizarana iray dia manohana ny fidirana PUT 1000 isan-tsegondra. Rehefa mamorona angon-drakitra dia mila mamaritra ny isan'ny fizarana ilaina ianao. Ohatra, azonao atao ny mamorona stream data misy fizarana roa. Ity stream data ity dia hanome famindrana angona fidirana amin'ny 2 MB/s ary famindrana angon-drakitra mivoaka amin'ny 4 MB/s, manohana hatramin'ny 2000 PUT firaketana isan-tsegondra.

Arakaraky ny shards ao amin'ny stream-nao no betsaka kokoa ny fivoahany. Amin'ny maha-fitsipika, toy izany no fomba mikoriana - amin'ny fampidirana shards. Saingy arakaraka ny anananao shards no avo kokoa ny vidiny. Ny shard tsirairay dia mitentina 1,5 cents isan'ora ary 1.4 cents fanampiny ho an'ny vondrona PUT iray tapitrisa.

Andao hamorona stream vaovao miaraka amin'ny anarana tapakila_ fiaramanidina, 1 sombintsombiny dia ampy ho azy:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Andeha isika hamorona kofehy hafa miaraka amin'ny anarana special_stream:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

Fametrahana mpamokatra

Mba hamakafaka asa iray dia ampy ny mampiasa ohatra EC2 mahazatra ho mpamokatra angona. Tsy voatery ho milina virtoaly mahery sy lafo vidy izy io; mety tsara ny toerana t2.micro.

Fanamarihana manan-danja: ohatra, tokony hampiasa sary ianao - Amazon Linux AMI 2018.03.0, manana toe-javatra vitsy kokoa ho an'ny fandefasana haingana ny Kinesis Agent.

Mankanesa any amin'ny serivisy EC2, mamorona milina virtoaly vaovao, safidio ny AMI tianao amin'ny karazana t2.micro, izay tafiditra ao amin'ny Free Tier:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Mba hahafahan'ilay milina virtoaly vao noforonina hifanerasera amin'ny serivisy Kinesis dia tsy maintsy omena zo hanao izany. Ny fomba tsara indrindra hanaovana izany dia ny manendry anjara IAM. Noho izany, amin'ny dingana 3: Configure Instance Details screen, tokony hifidy ianao Mamorona Role IAM vaovao:

Mamorona anjara IAM ho an'ny EC2
Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ao amin'ny varavarankely misokatra, safidio fa mamorona andraikitra vaovao ho an'ny EC2 isika ary mandehana any amin'ny fizarana Fahazoan-dalana:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Amin'ny fampiasana ny ohatra fanofanana, tsy mila miditra amin'ny saro-takarina rehetra amin'ny fanefena madinidinika momba ny zon'ny loharanon-karena isika, noho izany dia hifidy ireo politika efa nomanin'ny Amazon mialoha izahay: AmazonKinesisFullAccess sy CloudWatchFullAccess.

Andeha isika hanome anarana manan-danja ho an'ity andraikitra ity, ohatra: EC2-KinesisStreams-FullAccess. Ny vokatra dia tokony hitovy amin'ny aseho amin'ny sary etsy ambany:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Aorian'ny famoronana ity andraikitra vaovao ity dia aza adino ny mampiditra azy amin'ny ohatra milina virtoaly noforonina:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Tsy manova zavatra hafa amin'ity efijery ity izahay ary mandroso mankany amin'ny varavarankely manaraka.

Ny firafitry ny kapila mafy dia azo avela ho default, ary koa ny marika (na dia fanao tsara aza ny mampiasa marika, farafaharatsiny manome anarana ny ohatra ary manondro ny tontolo iainana).

Amin'izao fotoana izao isika dia eo amin'ny dingana 6: Configure Security Group tab, izay mila mamorona vaovao na mamaritra ny vondrona Security misy anao, izay ahafahanao mifandray amin'ny ssh (port 22) amin'ny ohatra. Safidio ny Source -> My IP ao ary azonao atao ny manomboka ny ohatra.

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Raha vantany vao mivadika amin'ny sata mihazakazaka izy dia azonao atao ny manandrana mampifandray azy amin'ny ssh.

Mba hahafahana miara-miasa amin'ny Kinesis Agent, aorian'ny fifandraisana am-pahombiazana amin'ny milina dia tsy maintsy ampidirinao ao amin'ny terminal ireto baiko manaraka ireto:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Andao hamorona lahatahiry hitehirizana valiny API:

sudo mkdir /var/log/airline_tickets

Alohan'ny hanombohana ny agence dia mila manamboatra ny config:

sudo vim /etc/aws-kinesis/agent.json

Ny votoatin'ny rakitra agent.json dia tokony ho toy izao:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Araka ny hita amin'ny fichier configuration, dia hanara-maso ireo rakitra miaraka amin'ny fanitarana .log ao amin'ny lahatahiry /var/log/airline_tickets/ ny mpitsikilo, hamafa azy ireo ary hamindra azy ireo amin'ny stream airline_tickets.

Averinay indray ny serivisy ary ataovy azo antoka fa mandeha sy mandeha:

sudo service aws-kinesis-agent restart

Andeha isika haka ny script Python izay hangataka angona avy amin'ny API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Ny script api_caller.py dia mangataka angona avy amin'ny Aviasales ary mitahiry ny valiny voaray ao amin'ny lahatahiry izay nojeren'ny masoivoho Kinesis. Ny fampiharana an'ity script ity dia somary manara-penitra, misy kilasy TicketsApi, ahafahanao misintona ny API amin'ny asynchronously. Mandalo lohapejy misy marika izahay ary mangataka mari-pamantarana amin'ity kilasy ity:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Mba hitsapana ny toe-javatra marina sy ny fiasan'ny mpiasa, andeha hozahana ny script api_caller.py:

sudo ./api_caller.py TOKEN

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ary mijery ny vokatry ny asa ao amin'ny diarin'ny Agent sy amin'ny tabilao Fanaraha-maso ao amin'ny stream data airline_tickets izahay:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Araka ny hitanao dia mandeha ny zava-drehetra ary ny Kinesis Agent dia nahomby tamin'ny fandefasana angon-drakitra amin'ny renirano. Andeha hodinihintsika izao ny mpanjifa.

Fametrahana Kinesis Data Analytics

Andao hiroso amin'ny singa afovoan'ny rafitra manontolo - mamorona fampiharana vaovao ao amin'ny Kinesis Data Analytics antsoina hoe kinesis_analytics_airlines_app:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ny Kinesis Data Analytics dia ahafahanao manao famakafakana data amin'ny fotoana tena izy avy amin'ny Kinesis Streams mampiasa ny fiteny SQL. Izy io dia serivisy autoscaling tanteraka (tsy toy ny Kinesis Streams) izay:

  1. mamela anao hamorona stream vaovao (Output Stream) mifototra amin'ny fangatahana loharanon-kevitra;
  2. manome stream misy hadisoana nitranga nandritra ny fampiharana (Error Stream);
  3. afaka mamaritra ho azy ny rafitra angon-drakitra fampidirana (azo amboarina amin'ny tanana izany raha ilaina).

Tsy serivisy mora izany - 0.11 USD isaky ny ora fiasana, noho izany dia tokony hampiasainao amim-pitandremana sy hamafa izany rehefa vita ianao.

Andao hampifandray ny fampiharana amin'ny loharano angona:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Safidio ny stream izay hifandraisantsika (airline_tickets):

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Manaraka, mila mametaka Role IAM vaovao ianao mba hahafahan'ny fampiharana mamaky avy amin'ny stream sy manoratra amin'ny stream. Mba hanaovana izany dia ampy ny tsy manova na inona na inona ao amin'ny sakana fahazoan-dàlana Access:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Andeha hojerentsika izao ny schema data ao amin'ny stream; Mba hanaovana izany, tsindrio ny bokotra "Discover schema". Vokatr'izany dia havaozina (voaforona) ny andraikitry ny IAM ary hatomboka amin'ny angon-drakitra efa tonga tao amin'ny riaka ny fitadiavana skema:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ankehitriny dia mila mandeha any amin'ny tonian-dahatsoratra SQL ianao. Rehefa manindry an'ity bokotra ity ianao dia hisy varavarankely iray hiseho mangataka anao handefa ny fampiharana - safidio izay tianao hatomboka:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ampidiro ao amin'ny varavarankelin'ny tonian-dahatsoratra SQL ity fanontaniana tsotra manaraka ity ary tsindrio Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Amin'ny angon-drakitra mifandraika, miara-miasa amin'ny latabatra ianao amin'ny fampiasana fanambarana INSERT hanampiana rakitsoratra sy fanambarana SELECT mba hangataka angona. Ao amin'ny Amazon Kinesis Data Analytics, miara-miasa amin'ny streams (STREAMs) sy paompy (PUMP) ianao—fangatahana fampidirana mitohy izay mampiditra angona avy amin'ny stream iray amin'ny rindranasa iray mankany amin'ny stream hafa.

Ny fangatahana SQL aseho etsy ambony dia mikaroka tapakila Aeroflot amin'ny vidiny latsaky ny dimy arivo roubles. Ny rakitra rehetra mahafeno ireo fepetra ireo dia hapetraka ao amin'ny stream DESTINATION_SQL_STREAM.

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ao amin'ny sakana Destination, safidio ny stream special_stream, ary ao amin'ny lisitry ny fampidinana anarana DESTINATION_SQL_STREAM amin'ny fampiharana:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ny vokatry ny fanodinkodinana rehetra dia tokony hitovy amin'ny sary etsy ambany:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

Mamorona sy misoratra anarana amin'ny lohahevitra SNS

Mankanesa any amin'ny serivisy fampandrenesana tsotra ary mamorona lohahevitra vaovao miaraka amin'ny anarana hoe Airlines:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Misoratra anarana amin'ity lohahevitra ity ary manondro ny nomeraon-telefaona handefasana fampandrenesana SMS:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

Mamorona tabilao ao amin'ny DynamoDB

Raha te hitahiry ny angona manta avy amin'ny reniranon'ny airline_tickets dia andao hamorona latabatra ao amin'ny DynamoDB miaraka amin'ny anarana mitovy. Hampiasa record_id ho fanalahidy fototra izahay:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

Mamorona mpanangom-bokatra lambda

Andeha isika hamorona asa lambda antsoina hoe Collector, izay ny andraikiny dia ny fitsapan-kevitra ny zotra airline_tickets ary, raha misy firaketana vaovao hita ao, dia ampidiro ao amin'ny latabatra DynamoDB ireo rakitra ireo. Mazava ho azy, ankoatry ny zon'ny default, ity lambda ity dia tsy maintsy namaky ny fidirana amin'ny angon-drakitra Kinesis ary manoratra fidirana amin'ny DynamoDB.

Mamorona anjara IAM ho an'ny asa mpanangona lambda
Voalohany, andao hamorona andraikitry IAM vaovao ho an'ny lambda antsoina hoe Lambda-TicketsProcessingRole:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ho an'ny ohatra fitsapana, ny politikan'ny AmazonKinesisReadOnlyAccess sy AmazonDynamoDBFullAccess efa namboarina mialoha dia mety tsara, araka ny aseho amin'ny sary etsy ambany:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

Ity lambda ity dia tokony halefa amin'ny trigger avy amin'ny Kinesis rehefa miditra ao amin'ny airline_stream ny fidirana vaovao, noho izany dia mila manampy trigger vaovao isika:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ny hany sisa tavela dia ny mampiditra ny kaody ary mamonjy ny lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Mamorona fampandrenesana fiasa lambda

Ny fiasa lambda faharoa, izay hanara-maso ny renirano faharoa (special_stream) ary handefa fampahafantarana amin'ny SNS, dia noforonina amin'ny fomba mitovy. Noho izany, ity lambda ity dia tsy maintsy manana fahafahana mamaky avy amin'ny Kinesis ary mandefa hafatra amin'ny lohahevitra SNS nomena, izay halefa amin'ny serivisy SNS amin'ny mpanjifa rehetra amin'ity lohahevitra ity (mailaka, SMS, sns.).

Famoronana anjara IAM
Voalohany, mamorona ny andraikitry IAM Lambda-KinesisAlarm ho an'ity lambda ity izahay, ary avy eo dia manendry ity andraikitra ity amin'ny lambda alarm_notifier noforonina:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

Ity lambda ity dia tokony hiasa amin'ny trigger ho an'ny rakitra vaovao hiditra ao amin'ny special_stream, noho izany dia mila manamboatra ny trigger ianao amin'ny fomba mitovy amin'ny nataonay tamin'ny Collector lambda.

Mba hanamora kokoa ny fanamboarana an'ity lambda ity, andao hampiditra fari-piainana vaovao - TOPIC_ARN, izay ametrahanay ny ANR (Amazon Recourse Names) amin'ny lohahevitra Airlines:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ary ampidiro ny code lambda, tsy sarotra mihitsy izany:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Toa eto no vita ny fanamboarana ny rafitra manual. Ny hany sisa tavela dia ny fitsapana sy ny fanaovana antoka fa voarindra tsara ny zava-drehetra.

Alefaso avy amin'ny code Terraform

Fiomanana ilaina

Terraform dia fitaovana open source tena mety amin'ny fametrahana fotodrafitrasa avy amin'ny code. Izy io dia manana ny syntax manokana izay mora ianarana ary manana ohatra maro momba ny fomba sy ny zavatra hapetraka. Ny mpamoaka lahatsoratra Atom na Visual Studio Code dia manana plugins mora ampiasaina izay manamora ny fiaraha-miasa amin'ny Terraform.

Azonao alaina ny fizarana avy eto. Ny famakafakana amin'ny antsipiriany momba ny fahaizan'ny Terraform rehetra dia tsy tafiditra ao anatin'ity lahatsoratra ity, ka hametra ny tenantsika amin'ireo hevi-dehibe isika.

Ahoana ny fomba hanombohana

Ny fehezan-dalàna feno amin'ny tetikasa dia ao amin'ny fitehirizako. Isika dia manao clone ny tahiry ho antsika. Alohan'ny hanombohanao dia mila mahazo antoka ianao fa manana AWS CLI napetraka sy namboarina, satria ... Terraform dia hitady fahazoan-dàlana ao amin'ny rakitra ~/.aws/credentials.

Ny fanao tsara dia ny fampandehanana ny baikon'ny drafitra alohan'ny hametrahana ny fotodrafitrasa manontolo hahitana ny zavatra noforonin'i Terraform ho antsika amin'izao fotoana izao ao amin'ny rahona:

terraform.exe plan

Hasaina ianao hampiditra laharan-telefaona handefasana fampahafantarana. Tsy ilaina ny miditra amin'io dingana io.

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Rehefa avy nandinika ny drafitry ny fampandehanana ny programa isika dia afaka manomboka mamorona loharano:

terraform.exe apply

Aorian'ny fandefasana ity baiko ity dia hangatahana indray ianao hampiditra nomeraon-telefaona; antsoy ny "eny" rehefa misy fanontaniana momba ny fanatanterahana ny hetsika. Izany dia ahafahanao manangana ny fotodrafitrasa iray manontolo, manatanteraka ny fanamafisana rehetra ilaina amin'ny EC2, mametraka ny fiasa lambda, sns.

Rehefa vita soa aman-tsara ny loharanon-karena rehetra tamin'ny alàlan'ny code Terraform dia mila miditra amin'ny antsipirian'ny fampiharana Kinesis Analytics ianao (indrisy fa tsy hitako ny fomba hanaovana izany mivantana avy amin'ny code).

Alefaso ny fampiharana:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Aorian'izany dia tsy maintsy mametraka mazava ny anaran'ny stream in-application ianao amin'ny alàlan'ny fisafidianana avy amin'ny lisitra midina:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Ankehitriny dia vonona ny handeha ny zava-drehetra.

Fitsapana ny fampiharana

Na manao ahoana na manao ahoana ny fametrahanao ny rafitra, amin'ny tanana na amin'ny alàlan'ny kaody Terraform, dia hiasa toy izany koa.

Miditra amin'ny alàlan'ny SSH amin'ny milina virtoaly EC2 izay ametrahana ny Kinesis Agent ary mampandeha ny script api_caller.py

sudo ./api_caller.py TOKEN

Ny hany tsy maintsy ataonao dia miandry SMS amin'ny laharanao:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
SMS - misy hafatra tonga amin'ny finday afaka 1 minitra:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara
Mbola hojerena raha voatahiry ao amin'ny angon-drakitra DynamoDB ireo rakitra ho an'ny famakafakana amin'ny antsipiriany kokoa. Ny tabilao airline_tickets dia ahitana ireto angona manaraka ireto:

Aviasales API fampidirana miaraka amin'ny Amazon Kinesis sy ny fahatsorana tsy misy mpizara

famaranana

Nandritra ny asa vita dia natsangana ny rafitra fanodinana angon-drakitra an-tserasera mifototra amin'ny Amazon Kinesis. Ny safidy amin'ny fampiasana ny Kinesis Agent miaraka amin'ny Kinesis Data Streams sy ny fanadihadiana amin'ny fotoana tena izy Kinesis Analytics mampiasa baiko SQL, ary koa ny fifandraisan'ny Amazon Kinesis amin'ny serivisy AWS hafa dia nodinihina.

Nalefanay tamin'ny fomba roa ity rafitra etsy ambony ity: boky iray lava be ary haingana avy amin'ny code Terraform.

Ny kaody loharanon'ny tetikasa rehetra dia misy ao amin'ny tahiry GitHub-ko, manoro hevitra anao aho mba hahalalanao izany.

Faly aho miresaka momba ilay lahatsoratra, manantena ny fanehoan-kevitrao aho. Manantena ny fanakianana manorina aho.

Maniry anao hahita fahombiazana aho!

Source: www.habr.com

Add a comment