Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

Hoy Habr!

Ganahan ka ba sa pagpalupad sa eroplano? Ganahan ko niini, apan sa panahon sa pag-inusara sa kaugalingon nahigugma usab ako sa pag-analisar sa mga datos sa mga tiket sa hangin gikan sa usa ka bantog nga kapanguhaan - Aviasales.

Karon atong analisahon ang trabaho sa Amazon Kinesis, magtukod og streaming system nga adunay real-time nga analytics, i-install ang database sa Amazon DynamoDB NoSQL isip nag-unang pagtipig sa datos, ug i-set up ang mga notipikasyon sa SMS alang sa makapaikag nga mga tiket.

Ang tanan nga mga detalye naa sa ilawom sa pagputol! Lakaw!

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

Pasiuna

Alang sa pananglitan, kinahanglan namon ang pag-access Aviasales API. Ang pag-access niini gihatag nga wala’y bayad ug wala’y mga pagdili; kinahanglan nimo nga magparehistro sa seksyon nga "Mga Nag-develop" aron madawat ang imong token sa API aron ma-access ang datos.

Ang nag-unang katuyoan niini nga artikulo mao ang paghatag sa usa ka kinatibuk-ang pagsabot sa paggamit sa impormasyon streaming sa AWS; among gikonsiderar nga ang data nga gibalik sa API nga gigamit mao ang dili estrikto nga up-to-date ug gipasa gikan sa cache, nga mao ang naporma base sa mga pagpangita sa mga tiggamit sa Aviasales.ru ug Jetradar.com nga mga site sulod sa miaging 48 ka oras.

Ang Kinesis-agent, nga na-install sa makina nga naghimo, nga nadawat pinaagi sa API awtomatik nga mag-parse ug magpadala sa datos sa gusto nga sapa pinaagi sa Kinesis Data Analytics. Ang hilaw nga bersyon niini nga sapa isulat direkta sa tindahan. Ang hilaw nga pagtipig sa datos nga gipakatap sa DynamoDB magtugot alang sa mas lawom nga pagtuki sa tiket pinaagi sa mga himan sa BI, sama sa AWS Quick Sight.

Atong hisgotan ang duha ka kapilian sa pag-deploy sa tibuok imprastraktura:

  • Manwal - pinaagi sa AWS Management Console;
  • Ang imprastraktura gikan sa Terraform code alang sa mga tapolan nga automators;

Ang arkitektura sa naugmad nga sistema

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Gigamit nga mga sangkap:

  • Aviasales API β€” ang datos nga gibalik niini nga API gamiton para sa tanang sunod nga trabaho;
  • EC2 Producer Instance β€” usa ka regular nga virtual machine sa panganod diin ang input data stream mabuhat:
    • Ahente sa Kinesis maoy usa ka Java nga aplikasyon nga gi-install sa lokal sa makina nga naghatag ug sayon ​​nga paagi sa pagkolekta ug pagpadala sa datos ngadto sa Kinesis (Kinesis Data Streams o Kinesis Firehose). Ang ahente kanunay nga nag-monitor sa usa ka hugpong sa mga file sa gitakda nga mga direktoryo ug nagpadala og bag-ong datos sa Kinesis;
    • API Caller Script β€” Usa ka script sa Python nga naghangyo sa API ug nagbutang sa tubag sa usa ka folder nga gibantayan sa Kinesis Agent;
  • Kinesis Data Streams - serbisyo sa real-time nga data streaming nga adunay daghang kapabilidad sa pag-scale;
  • Kinesis Analytics usa ka serbisyo nga wala’y server nga gipasimple ang pag-analisar sa streaming data sa tinuud nga oras. Ang Amazon Kinesis Data Analytics nag-configure sa mga kapanguhaan sa aplikasyon ug awtomatik nga nagtimbang aron pagdumala sa bisan unsang gidaghanon sa umaabot nga datos;
  • AWS Lambda β€” usa ka serbisyo nga nagtugot kanimo sa pagpadagan sa code nga wala’y pag-backup o pag-set up sa mga server. Ang tanan nga gahum sa pag-compute awtomatik nga gi-scale alang sa matag tawag;
  • Amazon DynamoDB - Usa ka database sa key-value pairs ug mga dokumento nga naghatag ug latency nga ubos sa 10 milliseconds kung nagdagan sa bisan unsang sukod. Kung gigamit ang DynamoDB, dili nimo kinahanglan nga maghatag, mag-patch, o magdumala sa bisan unsang mga server. Ang DynamoDB awtomatik nga nag-scale sa mga lamesa aron ma-adjust ang gidaghanon sa anaa nga mga kapanguhaan ug mamentinar ang taas nga performance. Wala’y kinahanglan nga pagdumala sa sistema;
  • Amazon SNS - usa ka hingpit nga pagdumala nga serbisyo alang sa pagpadala sa mga mensahe gamit ang modelo sa publisher-subscriber (Pub/Sub), diin mahimo nimong ihimulag ang mga microservice, distributed system ug serverless nga mga aplikasyon. Ang SNS mahimong gamiton sa pagpadala og impormasyon ngadto sa mga end user pinaagi sa mobile push notifications, SMS messages ug emails.

Inisyal nga pagbansay

Aron masundog ang dagan sa datos, nakahukom ko nga gamiton ang impormasyon sa tiket sa eroplano nga gibalik sa Aviasales API. SA dokumentasyon usa ka halapad nga lista sa lainlaing mga pamaagi, kuhaa ang usa niini - "Buwan nga Kalendaryo sa Presyo", nga nagbalik sa mga presyo alang sa matag adlaw sa bulan, nga gi-grupo sa gidaghanon sa mga pagbalhin. Kung dili nimo ipiho ang bulan sa pagpangita sa hangyo, ang kasayuran ibalik alang sa bulan pagkahuman sa karon.

Busa, magparehistro kita ug kuhaon ang atong token.

Usa ka pananglitan nga hangyo anaa sa ubos:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ang pamaagi sa ibabaw sa pagdawat sa datos gikan sa API pinaagi sa pagtino sa usa ka token sa hangyo molihok, apan mas gusto nako nga ipasa ang access token pinaagi sa header, mao nga atong gamiton kini nga pamaagi sa api_caller.py script.

Pananglitan sa tubag:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Ang pananglitan nga tubag sa API sa ibabaw nagpakita sa usa ka tiket gikan sa St. Petersburg ngadto sa Phuk... Oh, unsa ka damgo...
Tungod kay ako gikan sa Kazan, ug ang Phuket "usa lamang ka damgo", atong pangitaon ang mga tiket gikan sa St. Petersburg ngadto sa Kazan.

Nagtuo kini nga ikaw adunay usa ka AWS account. Gusto nako nga hatagan dayon ang espesyal nga atensyon sa kamatuoran nga ang Kinesis ug pagpadala mga pahibalo pinaagi sa SMS wala gilakip sa tinuig Libre nga Tier (libre nga paggamit). Apan bisan pa niini, nga adunay usa ka magtiayon nga dolyar sa hunahuna, posible nga matukod ang gisugyot nga sistema ug magdula niini. Ug, siyempre, ayaw kalimti ang pagtangtang sa tanan nga mga kahinguhaan pagkahuman nga dili na kini kinahanglan.

Maayo na lang, ang DynamoDb ug lambda function mahimong libre alang kanamo kung among makab-ot ang among binulan nga libre nga mga limitasyon. Pananglitan, alang sa DynamoDB: 25 GB sa pagtipig, 25 WCU/RCU ug 100 milyon nga mga pangutana. Ug usa ka milyon nga lambda function nga tawag matag bulan.

Manwal nga pag-deploy sa sistema

Pag-set up sa Kinesis Data Streams

Adto kita sa serbisyo sa Kinesis Data Streams ug paghimo og duha ka bag-ong sapa, usa ka shard alang sa matag usa.

Unsa ang usa ka shard?
Ang usa ka shard mao ang sukaranan nga yunit sa pagbalhin sa datos sa usa ka sapa sa Amazon Kinesis. Ang usa ka bahin naghatag ug input data transfer sa gikusgon nga 1 MB/s ug output data transfer sa gikusgon nga 2 MB/s. Ang usa ka bahin nagsuporta hangtod sa 1000 ka PUT nga mga entry matag segundo. Sa paghimo sa usa ka stream sa datos, kinahanglan nimo nga ipiho ang gikinahanglan nga gidaghanon sa mga bahin. Pananglitan, makahimo ka og data stream nga adunay duha ka bahin. Kini nga data stream maghatag input data transfer sa 2 MB/s ug output data transfer sa 4 MB/s, pagsuporta sa 2000 PUT records kada segundo.

Ang mas daghang shards sa imong stream, mas dako ang throughput niini. Sa prinsipyo, kini ang paagi nga ang mga agos gi-scale - pinaagi sa pagdugang sa mga shards. Apan kon mas daghan ang imong nabatonan, mas taas ang presyo. Ang matag shard nagkantidad ug 1,5 sentimos kada oras ug dugang 1.4 sentimos sa matag milyon nga PUT payload units.

Magbuhat ta ug bag-ong sapa nga adunay ngalan airline_tickets, 1 ka tipik igo na alang kaniya:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Karon maghimo ta ug laing thread nga naay ngalan espesyal nga_stream:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

Setup sa producer

Aron ma-analisar ang usa ka buluhaton, igo na nga mogamit usa ka regular nga pananglitan sa EC2 ingon usa ka prodyuser sa datos. Kini dili kinahanglan nga usa ka gamhanan, mahal nga virtual nga makina; ang usa ka lugar nga t2.micro maayo ra.

Importante nga nota: pananglitan, kinahanglan nimo gamiton ang imahe - Amazon Linux AMI 2018.03.0, kini adunay mas gamay nga mga setting alang sa dali nga paglansad sa Kinesis Agent.

Lakaw ngadto sa serbisyo sa EC2, paghimo og bag-ong virtual machine, pilia ang gusto nga AMI nga adunay tipo nga t2.micro, nga gilakip sa Free Tier:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Aron ang bag-ong gimugna nga virtual machine makahimo sa pagpakig-uban sa Kinesis nga serbisyo, kini kinahanglan nga hatagan ug katungod sa pagbuhat niini. Ang pinakamaayong paagi sa pagbuhat niini mao ang pag-assign og IAM Role. Busa, sa Lakang 3: I-configure ang Mga Detalye sa Instance screen, kinahanglan nimo nga pilion Paghimo og bag-ong IAM Role:

Paghimo usa ka tahas sa IAM alang sa EC2
Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Sa bintana nga nagbukas, pilia nga naghimo kami usa ka bag-ong papel alang sa EC2 ug adto sa seksyon sa Mga Permiso:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Gamit ang panig-ingnan sa pagbansay, dili na namo kinahanglan nga moadto sa tanan nga mga intricacies sa granular configuration sa mga katungod sa kahinguhaan, mao nga among pilion ang mga polisiya nga pre-configured sa Amazon: AmazonKinesisFullAccess ug CloudWatchFullAccess.

Hatagan nato ang pipila ka makahuluganon nga ngalan alang niini nga tahas, pananglitan: EC2-KinesisStreams-FullAccess. Ang resulta kinahanglan nga parehas sa gipakita sa litrato sa ubos:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Pagkahuman sa paghimo niining bag-ong tahas, ayaw kalimti nga ilakip kini sa gibuhat nga virtual machine nga pananglitan:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Wala kami magbag-o bisan unsa sa kini nga screen ug magpadayon sa sunod nga mga bintana.

Ang mga setting sa hard drive mahimong ibilin ingon nga default, ingon man ang mga tag (bisan kung maayo nga praktis ang paggamit sa mga tag, labing menos hatagan ang pananglitan sa usa ka ngalan ug ipakita ang palibot).

Karon naa kami sa Lakang 6: I-configure ang Security Group tab, diin kinahanglan nimo nga maghimo usa ka bag-o o ipiho ang imong kasamtangan nga grupo sa Seguridad, nga nagtugot kanimo sa pagkonektar pinaagi sa ssh (port 22) sa pananglitan. Pilia ang Tinubdan -> Akong IP didto ug mahimo nimong ilunsad ang pananglitan.

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Sa diha nga kini mobalhin sa running status, mahimo nimong sulayan ang pagkonektar niini pinaagi sa ssh.

Aron makahimo sa pagtrabaho uban sa Kinesis Agent, human sa malampuson nga pagkonektar sa makina, kamo kinahanglan gayud nga mosulod sa mosunod nga mga sugo sa terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Maghimo kita og folder aron i-save ang mga tubag sa API:

sudo mkdir /var/log/airline_tickets

Sa wala pa magsugod ang ahente, kinahanglan nimo nga i-configure ang config niini:

sudo vim /etc/aws-kinesis/agent.json

Ang mga sulod sa agent.json file kinahanglan tan-awon sama niini:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Ingon sa makita gikan sa configuration file, ang ahente mag-monitor sa mga file nga adunay .log extension sa /var/log/airline_tickets/ directory, i-parse kini ug ibalhin kini ngadto sa airline_tickets stream.

Gisugdan namon pag-usab ang serbisyo ug gisiguro nga kini nagdagan ug nagdagan:

sudo service aws-kinesis-agent restart

Karon atong i-download ang Python script nga mangayo og data gikan sa API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Ang script sa api_caller.py nangayo ug datos gikan sa Aviasales ug gitipigan ang nadawat nga tubag sa direktoryo nga gi-scan sa ahente sa Kinesis. Ang pagpatuman sa kini nga script medyo sukaranan, adunay klase nga TicketsApi, gitugotan ka nga asynchronously ibira ang API. Gipasa namo ang usa ka header nga adunay token ug nangayo og mga parameter niini nga klase:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Aron masulayan ang husto nga mga setting ug gamit sa ahente, atong sulayan pagdagan ang api_caller.py script:

sudo ./api_caller.py TOKEN

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ug among gitan-aw ang resulta sa trabaho sa Agent logs ug sa Monitoring tab sa airline_tickets data stream:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Sama sa imong nakita, ang tanan nagtrabaho ug ang Kinesis Agent malampuson nga nagpadala sa datos sa sapa. Karon atong i-configure ang consumer.

Pag-set up sa Kinesis Data Analytics

Mopadayon kita sa sentro nga bahin sa tibuok sistema - paghimo ug bag-ong aplikasyon sa Kinesis Data Analytics nga ginganlag kinesis_analytics_airlines_app:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ang Kinesis Data Analytics nagtugot kanimo sa paghimo sa real-time nga data analytics gikan sa Kinesis Streams gamit ang SQL nga pinulongan. Kini usa ka bug-os nga autoscaling nga serbisyo (dili sama sa Kinesis Streams) nga:

  1. nagtugot kanimo sa paghimo og bag-ong mga sapa (Output Stream) base sa mga hangyo sa tinubdan sa datos;
  2. naghatag sa usa ka sapa nga adunay mga sayup nga nahitabo samtang nagdagan ang mga aplikasyon (Error Stream);
  3. mahimong awtomatik nga mahibal-an ang laraw sa datos sa pag-input (kini mahimo nga mano-mano nga pag-usab kung kinahanglan).

Dili kini usa ka barato nga serbisyo - 0.11 USD kada oras sa pagtrabaho, mao nga kinahanglan nimo kining gamiton pag-ayo ug tangtangon kini kung mahuman ka.

Atong ikonektar ang aplikasyon sa tinubdan sa datos:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Pilia ang sapa nga among sumpayan (airline_tickets):

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Sunod, kinahanglan nimo nga maglakip sa usa ka bag-ong IAM Role aron ang aplikasyon makabasa gikan sa sapa ug makasulat sa sapa. Aron mahimo kini, igo na nga dili usbon ang bisan unsang butang sa block permiso sa Access:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Karon atong hangyoon ang pagdiskobre sa data schema sa stream; aron mahimo kini, i-klik ang "Discover schema" nga buton. Ingon nga resulta, ang papel sa IAM ma-update (usa ka bag-o ang pagabuhaton) ug ang schema detection ilunsad gikan sa datos nga miabot na sa sapa:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Karon kinahanglan nimo nga moadto sa editor sa SQL. Kung imong i-klik kini nga buton, usa ka bintana ang makita nga naghangyo kanimo sa paglansad sa aplikasyon - pilia kung unsa ang gusto nimo ilunsad:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Isulod ang mosunud nga yano nga pangutana sa bintana sa editor sa SQL ug i-klik ang Save ug Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Sa mga relational database, nagtrabaho ka sa mga lamesa gamit ang INSERT nga mga pahayag aron idugang ang mga rekord ug usa ka PILI nga pahayag sa pagpangutana sa datos. Sa Amazon Kinesis Data Analytics, nagtrabaho ka sa mga sapa (STREAMs) ug pumps (PUMPs)β€”nagpadayon nga pagsal-ot sa mga hangyo nga nagsal-ot sa datos gikan sa usa ka sapa sa usa ka aplikasyon ngadto sa laing sapa.

Ang pangutana sa SQL nga gipresentar sa ibabaw nangita alang sa mga tiket sa Aeroflot sa kantidad nga ubos sa lima ka libo nga mga rubles. Ang tanang mga rekord nga nakab-ot niini nga mga kondisyon ibutang sa DESTINATION_SQL_STREAM stream.

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Sa Destination block, pilia ang special_stream stream, ug sa In-application stream name DESTINATION_SQL_STREAM drop-down list:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ang resulta sa tanang manipulasyon kinahanglang susama sa hulagway sa ubos:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

Paghimo ug pag-subscribe sa usa ka hilisgutan sa SNS

Adto sa Simple Notification Service ug paghimo og bag-ong hilisgutan didto nga adunay ngalan nga Airlines:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Mag-subscribe sa kini nga hilisgutan ug ipakita ang numero sa mobile phone diin ipadala ang mga abiso sa SMS:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

Paghimo og lamesa sa DynamoDB

Aron matipigan ang hilaw nga datos gikan sa ilang airline_tickets stream, maghimo ta ug lamesa sa DynamoDB nga adunay parehas nga ngalan. Atong gamiton ang record_id isip panguna nga yawe:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

Paghimo usa ka kolektor sa function sa lambda

Magbuhat ta ug lambda function nga gitawag ug Collector, kansang tahas mao ang pag-poll sa airline_tickets stream ug, kung makit-an ang bag-ong mga rekord didto, isulod kini nga mga rekord sa lamesa sa DynamoDB. Dayag nga, dugang sa default nga mga katungod, kini nga lambda kinahanglan nga adunay pagbasa nga pag-access sa Kinesis data stream ug pagsulat sa pag-access sa DynamoDB.

Paghimo og IAM nga papel para sa collector lambda function
Una, maghimo kita og bag-ong papel sa IAM alang sa lambda nga ginganlan og Lambda-TicketsProcessingRole:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Alang sa panig-ingnan sa pagsulay, ang na-pre-configure nga AmazonKinesisReadOnlyAccess ug AmazonDynamoDBFullAccess nga mga palisiya angayan, sama sa gipakita sa litrato sa ubos:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

Kini nga lambda kinahanglan nga gilansad pinaagi sa usa ka gatilyo gikan sa Kinesis kung ang mga bag-ong entry mosulod sa airline_stream, mao nga kinahanglan namon nga magdugang usa ka bag-ong gatilyo:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ang nahabilin mao ang pagsulud sa code ug i-save ang lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Paghimo og lambda function notifier

Ang ikaduha nga function sa lambda, nga mag-monitor sa ikaduha nga sapa (special_stream) ug magpadala usa ka pahibalo sa SNS, gihimo sa parehas nga paagi. Busa, kini nga lambda kinahanglan adunay access sa pagbasa gikan sa Kinesis ug magpadala mga mensahe sa usa ka gihatag nga hilisgutan sa SNS, nga ipadala sa serbisyo sa SNS sa tanan nga mga subscriber niini nga hilisgutan (email, SMS, ug uban pa).

Paghimo usa ka tahas sa IAM
Una, gimugna namo ang papel sa IAM nga Lambda-KinesisAlarm para niining lambda, ug dayon i-assign kini nga papel sa alarm_notifier lambda nga gimugna:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

Kini nga lambda kinahanglan molihok sa usa ka gatilyo alang sa bag-ong mga rekord nga mosulod sa special_stream, mao nga kinahanglan nimo nga i-configure ang gatilyo sa parehas nga paagi sama sa among gibuhat alang sa Collector lambda.

Aron mas sayon ​​ang pag-configure niini nga lambda, atong ipaila ang bag-ong environment variable - TOPIC_ARN, diin atong ibutang ang ANR (Amazon Recourse Names) sa hisgutanan sa Airlines:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ug isulud ang lambda code, dili kini komplikado:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Morag dinhi nahuman ang pag-configure sa manual nga sistema. Ang nahabilin mao ang pagsulay ug pagsiguro nga na-configure namon ang tanan sa husto.

I-deploy gikan sa Terraform code

Gikinahanglan nga pagpangandam

Terraform mao ang usa ka kombenyente kaayo nga open-source nga himan alang sa pagdeploy sa imprastraktura gikan sa code. Kini adunay kaugalingon nga syntax nga dali makat-on ug adunay daghang mga pananglitan kung giunsa ug unsa ang i-deploy. Ang editor sa Atom o Visual Studio Code adunay daghang magamit nga mga plugins nga nagpadali sa pagtrabaho sa Terraform.

Mahimo nimong i-download ang pag-apod-apod gikan dinhi. Ang usa ka detalyado nga pagtuki sa tanang kapabilidad sa Terraform kay lapas pa sa kasangkaran niini nga artikulo, busa limitahan nato ang atong kaugalingon sa mga nag-unang punto.

Unsaon pagsugod

Ang bug-os nga code sa proyekto mao ang sa akong repository. Gi-clone namo ang repository sa among kaugalingon. Sa dili pa magsugod, kinahanglan nimong siguroon nga na-install ug na-configure ang AWS CLI, tungod kay... Pangitaon sa Terraform ang mga kredensyal sa ~/.aws/credentials file.

Ang usa ka maayong praktis mao ang pagpadagan sa sugo sa plano sa dili pa i-deploy ang tibuok nga imprastraktura aron makita kung unsa ang gihimo karon sa Terraform alang kanato sa panganod:

terraform.exe plan

Maaghat ka sa pagsulod sa numero sa telepono aron ipadala ang mga pahibalo. Dili kinahanglan nga mosulod niini sa kini nga yugto.

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Sa pag-analisar sa plano sa operasyon sa programa, makasugod kami sa paghimo og mga kapanguhaan:

terraform.exe apply

Human ipadala kini nga sugo, hangyoon ka pag-usab sa pagsulod sa numero sa telepono; i-dial ang "oo" kung ang usa ka pangutana bahin sa aktuwal nga pagbuhat sa mga aksyon gipakita. Gitugotan ka niini nga i-set up ang tibuuk nga imprastraktura, ipatuman ang tanan nga kinahanglan nga pag-configure sa EC2, i-deploy ang mga function sa lambda, ug uban pa.

Pagkahuman sa tanan nga mga kapanguhaan nga malampuson nga nahimo pinaagi sa Terraform code, kinahanglan nimo nga moadto sa mga detalye sa aplikasyon sa Kinesis Analytics (sa kasubo, wala nako nakit-an kung giunsa kini buhaton direkta gikan sa code).

Ilunsad ang aplikasyon:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Pagkahuman niini, kinahanglan nimo nga klaro nga itakda ang ngalan sa stream sa aplikasyon pinaagi sa pagpili gikan sa lista sa drop-down:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Karon ang tanan andam na sa pag-adto.

Pagsulay sa aplikasyon

Dili igsapayan kung giunsa nimo pag-deploy ang sistema, mano-mano o pinaagi sa code sa Terraform, parehas kini nga molihok.

Nag-log in kami pinaagi sa SSH sa EC2 virtual machine diin gi-install ang Kinesis Agent ug gipadagan ang api_caller.py script

sudo ./api_caller.py TOKEN

Ang kinahanglan nimong buhaton mao ang paghulat sa usa ka SMS sa imong numero:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
SMS - usa ka mensahe moabut sa imong telepono sa hapit 1 minuto:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano
Nagpabilin kini aron makita kung ang mga rekord natipig sa database sa DynamoDB alang sa sunod, mas detalyado nga pagtuki. Ang talaan sa airline_tickets adunay gibana-bana nga mosunod nga datos:

Ang Aviasales API integration sa Amazon Kinesis ug walay server nga kayano

konklusyon

Sa dagan sa trabaho nga nahimo, usa ka online nga sistema sa pagproseso sa datos ang gitukod base sa Amazon Kinesis. Gikonsiderar ang mga opsyon sa paggamit sa Kinesis Agent inubanan sa Kinesis Data Streams ug real-time analytics Kinesis Analytics gamit ang SQL commands, ingon man ang interaksyon sa Amazon Kinesis sa ubang mga serbisyo sa AWS.

Gi-deploy namo ang sistema sa ibabaw sa duha ka paagi: usa ka taas nga manwal ug usa ka dali gikan sa Terraform code.

Ang tanang project source code anaa sa akong GitHub repository, Gisugyot ko nga pamilyar ka niini.

Nalipay ako sa paghisgot sa artikulo, nagpaabut ako sa imong mga komento. Nanghinaut ko alang sa makaayo nga pagsaway.

Manghinaut ko nga molampos ka!

Source: www.habr.com

Idugang sa usa ka comment