Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

Hey Habr!

Ma hûn ji firîna balafiran hez dikin? Ez jê hez dikim, lê di dema xwe veqetandinê de ez jî evîndar bûm ji analîzkirina daneyên li ser bilêtên hewayê ji çavkaniyek navdar - Aviasales.

Îro em ê xebata Amazon Kinesis analîz bikin, bi analîtîkên rast-dem re pergalek streaming ava bikin, databasa Amazon DynamoDB NoSQL wekî hilanîna daneya sereke saz bikin, û ji bo bilêtên balkêş agahdariya SMS saz bikin.

Hemî hûrgulî di bin qutbûnê de ne! Ajotin!

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

Pîrozbahiyê

Ji bo nimûne, pêdivî ye ku em bigihîjin Aviasales API. Gihîştina wê bêpere û bê sînor tê peyda kirin; hûn tenê hewce ne ku di beşa "Pêşdebiran" de qeyd bikin da ku tokena API-ya xwe bistînin da ku bigihîjin daneyan.

Armanca sereke ya vê gotarê ev e ku meriv têgihîştinek giştî ya karanîna weşana agahdarî di AWS-ê de bide; em digirin ber çavan ku daneyên ku ji hêla API-ya hatî bikar anîn ve hatî vegerandin bi hişkî ne nûvekirî ne û ji cache-ê tê veguheztin. li ser bingeha lêgerînên bikarhênerên malperên Aviasales.ru û Jetradar.com yên 48 demjimêrên dawîn hatine çêkirin.

Kinesis-agent, ku li ser makîneya hilberîner hatî saz kirin, bi API-ê ve hatî wergirtin, dê bixweber bi navgîniya Kinesis Data Analytics ve daneyan ji herika xwestinê re parve bike û bişîne. Guhertoya xav a vê çemê dê rasterast ji firotgehê re were nivîsandin. Hilberîna daneya xav a ku li DynamoDB-ê hatî bicîh kirin dê rê bide analîza bilêtê ya kûrtir bi navgîniya amûrên BI, wek AWS Quick Sight.

Em ê du vebijarkan ji bo bicîhkirina tevahiya binesaziyê binirxînin:

  • Manual - bi rêya Konsolê Rêveberiya AWS;
  • Binesaziya ji koda Terraform ji bo otomatên tembel e;

Mîmariya pergala pêşkeftî

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Pêkhateyên bikaranîn:

  • Aviasales API - Daneyên ku ji hêla vê API-ê ve hatî vegerandin dê ji bo hemî karên paşîn werin bikar anîn;
  • Mînaka Hilberînerê EC2 - makîneyek virtual ya birêkûpêk di ewr de ku dê pêveka daneya têketinê were çêkirin:
    • Kinesis Agent serîlêdanek Java-yê ye ku bi herêmî li ser makîneyê hatî saz kirin ku rêyek hêsan peyda dike ku daneyan berhev bike û bişîne Kinesis (Kinesis Data Streams an Kinesis Firehose). Ajan bi berdewamî komek pelan di pelrêçên diyarkirî de dişopîne û daneyên nû ji Kinesis re dişîne;
    • API Caller Script - Skrîpteke Python ku daxwazan ji API-yê re dike û bersivê dixe peldankek ku ji hêla Kinesis Agent ve tê şopandin;
  • Kinesis Data Streams - karûbarê weşana daneya rast-a-dem bi kapasîteyên pîvandinê yên berfireh;
  • Kinesis Analytics karûbarek bê server e ku analîzkirina daneya weşana di wextê rast de hêsan dike. Amazon Kinesis Data Analytics çavkaniyên serîlêdanê mîheng dike û bixweber pîvaz dike da ku her hejmûna daneya gihîştî bigire;
  • AWS Lambda - karûbarek ku destûrê dide te ku hûn kodê bêyî piştgirîkirin an sazkirina pêşkêşkeran bimeşînin. Hemî hêza hesabkirinê ji bo her bangekê bixweber tê pîvandin;
  • Amazon DynamoDB - Databasek ji cot û belgeyên key-nirxê ku dema ku di her pîvanê de dimeşe derengiya kêmtir ji 10 milî çirkeyan peyda dike. Dema ku DynamoDB bikar tînin, hûn ne hewce ne ku serverek peyda bikin, patch bikin, an rêvebirin. DynamoDB bixweber tabloyan hûr dike da ku mîqdara çavkaniyên berdest rast bike û performansa bilind biparêze. Ne rêveberiya pergalê hewce ye;
  • Amazon SNS - karûbarek bi tevahî rêvekirî ji bo şandina peyaman bi karanîna modela weşanger-abonet (Pub/Sub), ku pê re hûn dikarin mîkroxizmet, pergalên belavkirî û sepanên bê server veqetînin. SNS dikare were bikar anîn da ku agahdariya ji bikarhênerên dawîn re bi navgîniya ragihandinên push mobîl, peyamên SMS û e-nameyê bişîne.

Perwerdehiya destpêkê

Ji bo ku herikîna daneyê teqlîd bikim, min biryar da ku agahdariya bilêta balafirê ya ku ji hêla Aviasales API ve hatî vegerandin bikar bînim. LI belgekirin navnîşek pir berfireh a rêbazên cihêreng, em yek ji wan bigirin - "Salnameya Bihayê Mehane", ku bihayên ji bo her rojên mehê vedigerîne, li gorî hejmara veguheztinan têne kom kirin. Ger hûn meha lêgerînê di daxwaznameyê de diyar nekin, dê agahdarî ji bo meha li dû ya heyî were vegerandin.

Ji ber vê yekê, bila em qeyd bikin û nîşana xwe bistînin.

Daxwazek nimûne li jêr e:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Rêbaza jorîn a wergirtina daneyan ji API-ê bi destnîşankirina nîşanek di daxwaznameyê de dê bixebite, lê ez tercîh dikim ku tokena gihîştinê di serî de derbas bikim, ji ber vê yekê em ê vê rêbazê di skrîpta api_caller.py de bikar bînin.

Mînak bersiv:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Nimûneya bersiva API-ê ya jorîn bilêtek ji St.
Ji ber ku ez ji Kazan im, û Phuket naha "tenê xewnek" e, em li bilêtên ji St.

Ew texmîn dike ku we berê hesabek AWS heye. Ez dixwazim tavilê balê bikişînim ser vê yekê ku Kinesis û şandina agahdariya bi SMS-ê di nav salnameya salane de ne Rêzeya belaş (bikaranîna belaş). Lê tevî vê yekê jî, bi çend dolaran di hişê xwe de, pir gengaz e ku meriv pergala pêşniyarkirî ava bike û pê re bilîze. Û, bê guman, ji bîr nekin ku piştî ku ew êdî hewce nebin, hemî çavkaniyan jêbirin.

Xwezî, fonksiyonên DynamoDb û lambda dê ji me re belaş bin heke em sînorên xweyên belaş ên mehane bicîh bînin. Mînakî, ji bo DynamoDB: 25 GB hilanînê, 25 WCU / RCU û 100 mîlyon pirs. Û mehê mîlyonek fonksiyona lambda bang dike.

Bicihkirina sîstema Manual

Sazkirina Daneyên Kinesis Streams

Werin em biçin servîsa Kinesis Data Streams û du tîrêjên nû, ji bo her yekê şûrek biafirînin.

Şirik çi ye?
A shard yekîneya veguheztina daneya bingehîn a çemek Amazon Kinesis e. Yek beş veguheztina daneya têketinê bi leza 1 MB / s û veguheztina daneya derketinê bi leza 2 MB / s peyda dike. Yek beş di çirkeyê de heya 1000 têketinên PUT piştgirî dike. Dema ku hûn tîrêjek daneyê diafirînin, hûn hewce ne ku hejmara beşan hewcedar diyar bikin. Mînakî, hûn dikarin bi du beşan ve hilberek daneyê biafirînin. Ev herikîna daneyê dê veguheztina daneya têketinê bi 2 MB / s û veguheztina daneya derketinê bi 4 MB / s peyda bike, di çirkeyê de heya 2000 tomarên PUT piştgirî dike.

Di herikîna we de çi qas şikestî hebe, berbi wê jî mezintir dibe. Di prensîbê de, herikîn bi vî rengî têne pîvan kirin - bi lêzêdekirina şikestî. Lê çi qas şûşeyên we hebin, ew qas bihayê wê bilindtir dibe. Ji bo her mîlyon yekeyên bargiraniya PUT-ê her perçeyek 1,5 cent û 1.4 centên din jî lê dike.

Werin em bi navî çîçek nû ava bikin airline_billets, 1 şûşeyek ji wî re bes e:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Naha em bi navî mijarek din ava bikin special_stream:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

Sazkirina hilberîner

Ji bo analîzkirina karekî, bes e ku meriv mînakek EC2 ya birêkûpêk wekî hilberînerek daneyê bikar bîne. Ne hewce ye ku ew makîneyek virtual ya hêzdar û biha be; cîhek t2.micro dê baş bike.

Nîşeya girîng: Mînakî, divê hûn wêneyê bikar bînin - Amazon Linux AMI 2018.03.0, ji bo destpêkirina bilez Kinesis Agent kêmtir mîhengên wê hene.

Herin ser karûbarê EC2, makîneyek virtual ya nû biafirînin, AMI-ya xwestî ya bi tîpa t2.micro, ku di Rêjeya Belaş de tê de ye, hilbijêrin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Ji bo ku makîneya virtual ya ku nû hatî afirandin bikaribe bi karûbarê Kinesis re têkildar be, divê mafên vê yekê jê re were dayîn. Awayê çêtirîn ku meriv vê yekê bike ev e ku meriv Rola IAM-ê bide. Ji ber vê yekê, li ser ekrana Gav 3: Veavakirina Detayên Mînakê, divê hûn hilbijêrin Rola IAM-a nû biafirînin:

Afirandina rola IAM ji bo EC2
Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Di pencereya ku vedibe de, hilbijêrin ku em ji bo EC2 rolek nû diafirînin û biçin beşa Destûran:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Bi karanîna mînaka perwerdehiyê, ne hewce ye ku em bikevin nav hemî tevliheviyên mîhengên hûrgulî yên mafên çavkaniyê, ji ber vê yekê em ê polîtîkayên ku ji hêla Amazon-ê ve hatine pêş-avakirin hilbijêrin: AmazonKinesisFullAccess û CloudWatchFullAccess.

Ka em hin navekî watedar bidin vê rola, mînakî: EC2-KinesisStreams-FullAccess. Divê encam heman be ku di wêneya jêrîn de tê nîşandan:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Piştî afirandina vê rola nû, ji bîr nekin ku wê bi mînaka makîneya virtual ya hatî afirandin ve girêbidin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Em li ser vê ekranê tiştek din naguherînin û diçin pencereyên din.

Mîhengên dîska hişk dikare wekî xwerû were hiştin, û her weha etîketan (her çend pratîkek baş e ku meriv etîketan bikar bîne, bi kêmanî navekî bide nimûneyê û jîngehê destnîşan bike).

Naha em li ser Gav 6 in: Tabloya Koma Ewlekariyê Mîheng bikin, ku hûn hewce ne ku yek nû biafirînin an koma xweya Ewlekariya heyî diyar bikin, ku destûrê dide te ku hûn bi ssh (port 22) bi nimûneyê ve girêbidin. Çavkanî -> IP-ya min li wir hilbijêrin û hûn dikarin nimûneyê bidin destpêkirin.

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Mîna ku ew veguhere rewşa xebitandinê, hûn dikarin hewl bidin ku bi ssh ve pê ve girêbidin.

Ji bo ku hûn bikaribin bi Kinesis Agent re bixebitin, piştî ku bi serfirazî bi makîneyê ve girêdidin, divê hûn di termînalê de emrên jêrîn têkevin:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ka em peldankek biafirînin ku bersivên API-ê hilînin:

sudo mkdir /var/log/airline_tickets

Berî ku hûn nûnerê dest pê bikin, hûn hewce ne ku konfigurasyona wê mîheng bikin:

sudo vim /etc/aws-kinesis/agent.json

Naveroka pelê agent.json divê bi vî rengî xuya bike:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Wekî ku ji pelê veavakirinê tê dîtin, ajan dê pelên bi dirêjkirina .log-ê di pelrêça /var/log/airline_tickets/ de bişopîne, wan parsek bike û veguhezîne herika airline_tickets.

Em karûbar ji nû ve dest pê dikin û pê ewle ne ku ew bi rê ve dibe:

sudo service aws-kinesis-agent restart

Naha em skrîpta Python dakêşin ku dê daneyan ji API-yê bixwaze:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skrîpta api_caller.py daneyan ji Aviasales daxwaz dike û bersiva wergirtî di pelrêça ku nûnerê Kinesis lê dikole hilîne. Pêkanîna vê skrîptê pir standard e, çînek TicketsApi heye, ew dihêle hûn API-ê asynkron bikişîne. Em sernivîsek bi nîşanek derbas dikin û pîvanan ji vê polê re daxwaz dikin:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Ji bo ceribandina mîhengên rast û fonksiyona agentê, em skrîpta api_caller.py biceribînin:

sudo ./api_caller.py TOKEN

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Û em li encama xebatê di têketinên Agent û li ser tabloya Çavdêriyê ya di herikîna daneya firoke_bilêtan de dinêrin:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Wekî ku hûn dikarin bibînin, her tişt dixebite û Kinesis Agent bi serfirazî daneyan dişîne ser çemê. Naha werin em xerîdar mîheng bikin.

Sazkirina Kinesis Data Analytics

Ka em biçin beşa navendî ya tevahiya pergalê - di Kinesis Data Analytics de serîlêdanek nû bi navê kinesis_analytics_airlines_app biafirînin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Kinesis Data Analytics destûrê dide te ku hûn bi karanîna zimanê SQL analîzên daneya rast-ê ji Kinesis Streams pêk bînin. Ew karûbarek bi tevahî otomatîkî ye (bervajî Kinesis Streams) ku:

  1. destûrê dide te ku hûn li ser bingeha daxwazên daneyên çavkaniyê pêlên nû (Output Stream) biafirînin;
  2. streamek bi xeletiyên ku di dema xebitandina sepanan de derketine peyda dike (Error Stream);
  3. dikare bixweber nexşeya daneya têketinê diyar bike (heke hewce bike ew dikare bi destan ji nû ve were pênase kirin).

Ev ne karûbarek erzan e - 0.11 USD serê saetekê kar, ji ber vê yekê divê hûn wê bi baldarî bikar bînin û gava ku hûn qediyan jêbirin.

Ka em serîlêdanê bi çavkaniya daneyê ve girêdin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Roka ku em ê pê ve girêbidin hilbijêrin (bilêtên_firînê):

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Dûv re, hûn hewce ne ku Rola IAM-ê ya nû ve girêbidin da ku serîlêdan bikaribe ji çemê bixwîne û li ser çemê binivîse. Ji bo vê yekê, bes e ku hûn di bloka destûrên Gihîştinê de tiştek neguherînin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Naha em daxwaza vedîtina şemaya daneyê ya di çemê de bikin; ji bo vê yekê, li ser bişkoka "Şemaya Vedîtin" bikirtînin. Wekî encamek, dê rola IAM-ê were nûve kirin (yek nû dê were afirandin) û vedîtina şemayê dê ji daneyên ku berê gihîştine rêyê were destpêkirin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Naha hûn hewce ne ku biçin edîtorê SQL. Dema ku hûn li ser vê bişkojkê bikirtînin, dê pencereyek xuya bibe ku ji we dipirse ku hûn serîlêdanê bidin destpêkirin - tiştê ku hûn dixwazin dest pê bikin hilbijêrin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Pirsa hêsan a jêrîn têxe pencereya edîtorê SQL û bikirtînin Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Di databasên pêwendiyê de, hûn bi tabloyan re dixebitin ku daxuyaniyên INSERT bikar bînin da ku tomar û daxuyaniyek SELECT ji bo lêpirsîna daneyan zêde bikin. Di Amazon Kinesis Data Analytics de, hûn bi çeman (STREAM) û pompeyan (PUMP) re dixebitin - daxwazên têxê yên domdar ên ku daneya ji herikekê di serîlêdanekê de di nav çemek din de vedihewînin.

Pirsa SQL ku li jor hatî pêşkêş kirin li bilêtên Aeroflot bi lêçûnek di binê pênc hezar rubleyan de digere. Hemî tomarên ku van şertan bi cih tînin dê di herika DESTINATION_SQL_STREAM de werin danîn.

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Di bloka Destê de, tîrêja special_stream hilbijêrin, û di navnîşa dakêşana nav-serlêdanê de DESTINATION_SQL_STREAM:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Encama hemî manipulasyonan divê tiştek mîna wêneya jêrîn be:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

Afirandin û abonetiya mijarek SNS

Herin Karûbarê Agahdariya Hêsan û li wir bi navê Airlines mijarek nû biafirînin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Aboneyê vê mijarê bibin û jimara têlefona desta ya ku dê agahdariya SMS jê re werin şandin destnîşan bikin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

Di DynamoDB de tabloyek çêbikin

Ji bo hilanîna daneyên xav ji hêlîna airline_tickets-a wan, werin em di DynamoDB de tabloyek bi heman navî biafirînin. Em ê record_id wekî mifteya bingehîn bikar bînin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

Afirandina berhevkarek fonksiyona lambda

Werin em fonksiyonek lambda ya bi navê Collector biafirînin, ku peywira wê dê ev be ku li ser tîrêja_bilêtên balafirê rapirsî bike û, ger tomarên nû li wir hatin dîtin, van tomaran têxin tabloya DynamoDB. Eşkere ye, ji bilî mafên xwerû, ev lambda pêdivî ye ku xwendibe gihandina daneya Kinesis-ê û gihîştina DynamoDB-ê binivîse.

Afirandina rola IAM-ê ji bo fonksiyona lambda berhevkar
Pêşîn, bila em ji bo lambda bi navê Lambda-TicketsProcessingRole rolek IAM-a nû biafirînin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Ji bo nimûneya testê, polîtîkayên AmazonKinesisReadOnlyAccess û AmazonDynamoDBFullAccess-ya pêş-sazkirî pir maqûl in, wekî ku di wêneya jêrîn de tê xuyang kirin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

Dema ku têketinên nû têkevin airline_stream divê ev lambda ji hêla tetikek ji Kinesis ve were destpêkirin, ji ber vê yekê em hewce ne ku tetikek nû lê zêde bikin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Tiştê ku dimîne ev e ku meriv kodê têxe û lambda xilas bike.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Afirandina agahdariyek fonksiyona lambda

Fonksiyona lambda ya duyemîn, ku dê herika duyemîn (special_stream) bişopîne û agahdariyek ji SNS re bişîne, bi heman rengî tête çêkirin. Ji ber vê yekê, pêdivî ye ku ev lambda bigihîje xwendina ji Kinesis û şandina peyaman ji mijarek SNS ya diyarkirî re, ku dûv re dê ji hêla karûbarê SNS ve ji hemî aboneyên vê mijarê re were şandin (e-name, SMS, hwd.).

Afirandina rola IAM
Pêşî, em ji bo vê lambda rola IAM-ê Lambda-KinesisAlarm diafirînin, û dûv re jî vê rolê ji alarm_notifier lambda-ya ku tê afirandin re destnîşan dikin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

Pêdivî ye ku ev lambda li ser tetikek bixebite ku tomarên nû têkevin navherikê special_stream, ji ber vê yekê hûn hewce ne ku tetikê bi heman awayê ku me ji bo lambdaya Berhevkar kir mîheng bikin.

Ji bo ku hûn mîhengkirina vê lambda hêsantir bikin, werin em guhêrbarek jîngehê ya nû destnîşan bikin - TOPIC_ARN, ku em li wir ANR (Navên Çavdêriya Amazon) ya mijara Rêhewayê bi cih dikin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Û koda lambda têxin, ew qet ne tevlihev e:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Wusa dixuye ku li vir veavakirina pergala destan qediya ye. Tiştê ku dimîne ev e ku em ceribandin û piştrast bikin ku me her tişt rast mîheng kiriye.

Ji koda Terraform saz bikin

Amadekirina pêwîst

Terraform ji bo bicihkirina binesaziya ji kodê amûrek çavkaniya vekirî ya pir hêsan e. Ew hevoksaziya xwe heye ku fêrbûna wê hêsan e û gelek mînakên wê hene ka meriv çawa û çi bicîh bike. Edîtorê Atom an Visual Studio Code gelek pêvekên kêrhatî hene ku xebata bi Terraform re hêsantir dike.

Hûn dikarin belavkirinê dakêşin ji vir. Analîzek berfireh a hemî kapasîteyên Terraform li derveyî çarçoweya vê gotarê ye, ji ber vê yekê em ê xwe bi xalên sereke re sînordar bikin.

Meriv çawa dest pê dike

Koda tevahî ya projeyê ye di depoya min de. Em depoyê ji xwe re klon dikin. Berî ku hûn dest pê bikin, hûn hewce ne ku pê ewle bin ku we AWS CLI saz kiriye û mîheng kiriye, ji ber ku ... Terraform dê di pelê ~/.aws/credentials de li pêbaweran bigere.

Pratîkek baş ev e ku meriv emrê planê berî ku tevahî binesaziyê bicîh bike bimeşîne da ku bibînin ka Terraform niha ji me re di ewr de diafirîne:

terraform.exe plan

Dê ji we were xwestin ku hûn jimareyek têlefonê têkevin da ku agahdariyan jê re bişînin. Di vê qonaxê de ne hewce ye ku têkevin.

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Piştî ku plansaziya xebata bernameyê analîz kirin, em dikarin dest bi afirandina çavkaniyan bikin:

terraform.exe apply

Piştî şandina vê fermanê, dê dîsa ji we were xwestin ku hûn jimareyek têlefonê binivîsin; gava ku pirsek di derbarê pêkanîna çalakiyan de tê xuyang kirin "erê" bikire. Ev ê dihêle hûn binesaziya tevahî saz bikin, hemî veavakirina EC2-ya pêwîst pêk bînin, fonksiyonên lambda bicîh bikin, hwd.

Piştî ku hemî çavkanî bi serfirazî bi koda Terraform ve hatin afirandin, hûn hewce ne ku hûn biçin hûrguliyên serîlêdana Kinesis Analytics (mixabin, min nedît ku meriv çawa vê yekê rasterast ji kodê bikim).

Serlêdanê dest pê bikin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Piştî vê yekê, divê hûn bi bijartina ji navnîşa dakêşanê bi eşkere navê herika nav-serlêdanê destnîşan bikin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Niha her tişt amade ye ku here.

Testkirina sepanê

Her çi qas we pergalê, bi destan an bi koda Terraform-ê saz kiriye, ew ê bi heman rengî bixebite.

Em bi SSH-ê têkevin makîneya virtual EC2 ya ku Kinesis Agent lê hatî saz kirin û skrîpta api_caller.py dimeşîne.

sudo ./api_caller.py TOKEN

Tiştê ku divê hûn bikin ev e ku li benda SMSek ji bo hejmara xwe bisekinin:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
SMS - di nav 1 hûrdeman de peyamek têlefonê tê:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server
Dimîne ku em bibînin ka tomar di databasa DynamoDB de ji bo analîzên paşerojê, hûrgulî hatine tomar kirin. Tabloya bilêtên_firînê bi qasî daneyên jêrîn vedihewîne:

Yekbûna Aviasales API bi Amazon Kinesis û sadebûna bê server

encamê

Di çarçoveya xebata hatî kirin de, pergalek hilberandina daneya serhêl li ser bingeha Amazon Kinesis hate çêkirin. Vebijarkên ji bo karanîna Kinesis Agent digel Daneyên Kinesis Streams û analîtîkên rastîn ên Kinesis Analytics bi karanîna fermanên SQL, û her weha pêwendiya Amazon Kinesis bi karûbarên din ên AWS re hate hesibandin.

Me pergala jorîn bi du awayan bicîh kir: yek destanek pir dirêj û ya bilez ji koda Terraform.

Hemî koda çavkaniya projeyê heye di depoya min a GitHub de, Ez pêşniyar dikim ku hûn xwe pê nas bikin.

Ez kêfxweş im ku gotarê nîqaş bikim, ez li benda şîroveyên we me. Ez ji bo rexneyên çêker hêvî dikim.

Ez hêvî dikim serkeftî!

Source: www.habr.com

Add a comment