Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Ħej Habr!

Tħobb ittajjar ajruplani? Inħobbha, iżda waqt l-iżolament jien inħobb ukoll bl-analiżi tad-dejta dwar il-biljetti tal-ajru minn riżorsa waħda magħrufa - Aviasales.

Illum se nanalizzaw ix-xogħol ta 'Amazon Kinesis, nibnu sistema ta' streaming b'analiżi f'ħin reali, ninstallaw id-database Amazon DynamoDB NoSQL bħala l-ħażna ewlenija tad-dejta, u nwaqqfu notifiki SMS għal biljetti interessanti.

Id-dettalji kollha huma taħt il-qatgħa! Mur!

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Introduzzjoni

Għall-eżempju, għandna bżonn aċċess għal Aviasales API. L-aċċess għalih huwa pprovdut mingħajr ħlas u mingħajr restrizzjonijiet; għandek bżonn biss tirreġistra fit-taqsima "Iżviluppaturi" biex tirċievi t-token tal-API tiegħek biex taċċessa d-dejta.

L-għan ewlieni ta’ dan l-artikolu huwa li jagħti fehim ġenerali tal-użu tal-istrimjar tal-informazzjoni fl-AWS; nqisu li d-dejta rritornata mill-API użata mhix strettament aġġornata u tiġi trasferita mill-cache, li hija iffurmat ibbażat fuq tfittxijiet minn utenti tas-siti Aviasales.ru u Jetradar.com għall-aħħar 48 siegħa.

Kinesis-agent, installat fuq il-magna li tipproduċi, riċevut permezz tal-API awtomatikament se parse u jittrażmetti d-data għall-fluss mixtieq permezz Kinesis Data Analytics. Il-verżjoni mhux ipproċessata ta 'dan il-fluss se tinkiteb direttament lill-maħżen. Il-ħażna tad-dejta mhux ipproċessata skjerata f'DynamoDB se tippermetti analiżi aktar profonda tal-biljetti permezz ta' għodod tal-BI, bħal AWS Quick Sight.

Se nikkunsidraw żewġ għażliet għall-iskjerament tal-infrastruttura kollha:

  • Manwal - permezz tal-AWS Management Console;
  • Infrastruttura mill-kodiċi Terraform hija għal awtomaturi għażżien;

Arkitettura tas-sistema żviluppata

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Komponenti użati:

  • Aviasales API — id-dejta rritornata minn din l-API se tintuża għax-xogħol sussegwenti kollu;
  • Istanza tal-Produttur EC2 — magna virtwali regolari fil-cloud li fuqha se jiġi ġġenerat il-fluss tad-dejta tal-input:
    • Kinesis Agent hija applikazzjoni Java installata lokalment fuq il-magna li tipprovdi mod faċli biex tiġbor u tibgħat dejta lil Kinesis (Kinesis Data Streams jew Kinesis Firehose). L-aġent jimmonitorja kontinwament sett ta 'fajls fid-direttorji speċifikati u jibgħat data ġdida lil Kinesis;
    • API Caller Script — Script Python li jagħmel talbiet lill-API u jpoġġi r-rispons f'folder li huwa mmonitorjat mill-Aġent Kinesis;
  • Kinesis Data Streams — servizz ta’ data streaming f’ħin reali b’kapaċitajiet ta’ skala wiesgħa;
  • Kinesis Analytics huwa servizz bla server li jissimplifika l-analiżi tad-dejta tal-istrimjar f'ħin reali. Amazon Kinesis Data Analytics tikkonfigura r-riżorsi tal-applikazzjoni u tiskala awtomatikament biex timmaniġġja kwalunkwe volum ta 'dejta li tidħol;
  • AWS Lambda — servizz li jippermettilek tħaddem kodiċi mingħajr ma tagħmel backup jew twaqqaf servers. Il-qawwa kollha tal-kompjuter hija awtomatikament skalata għal kull sejħa;
  • Amazon DynamoDB - Database ta' pari ta' valuri ewlenin u dokumenti li tipprovdi latenza ta' inqas minn 10 millisekondi meta taħdem fuq kwalunkwe skala. Meta tuża DynamoDB, m'għandekx bżonn tipprovdi, twaħħal jew tmexxi xi servers. DynamoDB awtomatikament iskala t-tabelli biex jaġġusta l-ammont ta 'riżorsi disponibbli u jżomm prestazzjoni għolja. L-ebda amministrazzjoni tas-sistema mhi meħtieġa;
  • Amazon SNS - servizz immaniġġjat bis-sħiħ biex jintbagħtu messaġġi bl-użu tal-mudell pubblikatur-abbonat (Pub/Sub), li bih tista’ tiżola mikroservizzi, sistemi distribwiti u applikazzjonijiet mingħajr server. L-SNS jista' jintuża biex jibgħat informazzjoni lill-utenti finali permezz ta' notifiki push mobbli, messaġġi SMS u emails.

Taħriġ inizjali

Biex nimita l-fluss tad-dejta, iddeċidejt li nuża l-informazzjoni tal-biljett tal-ajru mibgħuta lura mill-API Aviasales. IN dokumentazzjoni lista pjuttost estensiva ta 'metodi differenti, ejja nieħdu wieħed minnhom - "Kalendarju tal-Prezzijiet ta' Kull Xahar", li jirritorna l-prezzijiet għal kull jum tax-xahar, miġbura bin-numru ta 'trasferimenti. Jekk ma tispeċifikax ix-xahar tat-tfittxija fit-talba, l-informazzjoni tingħata lura għax-xahar ta' wara dak attwali.

Allura, ejja nirreġistraw u nġibu t-token tagħna.

Talba ta' eżempju hija hawn taħt:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Il-metodu ta 'hawn fuq biex tirċievi data mill-API billi tispeċifika token fit-talba se taħdem, imma nippreferi ngħaddi t-token ta' aċċess mill-header, għalhekk se nużaw dan il-metodu fl-iskrittura api_caller.py.

Eżempju ta' tweġiba:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

L-eżempju tar-rispons API hawn fuq juri biljett minn San Pietruburgu għal Phuk... Oh, x'ħolma...
Peress li jien minn Kazan, u Phuket issa hija "ħolma biss", ejja nfittxu biljetti minn San Pietruburgu għal Kazan.

Jassumi li diġà għandek kont AWS. Nixtieq immedjatament niġbed attenzjoni speċjali għall-fatt li Kinesis u li jintbagħtu notifiki permezz ta' SMS mhumiex inklużi fil-programm annwali. Livell b'xejn (użu b'xejn). Iżda anke minkejja dan, bi ftit dollari f'moħħu, huwa pjuttost possibbli li tinbena s-sistema proposta u tilgħab magħha. U, ovvjament, tinsiex tħassar ir-riżorsi kollha wara li ma jkunux aktar meħtieġa.

Fortunatament, il-funzjonijiet DynamoDb u lambda se jkunu b'xejn għalina jekk nilħqu l-limiti bla ħlas ta 'kull xahar tagħna. Pereżempju, għal DynamoDB: 25 GB ta 'ħażna, 25 WCU/RCU u 100 miljun mistoqsija. U miljun sejħiet tal-funzjoni lambda kull xahar.

Skjerament manwali tas-sistema

Twaqqif ta' Kinesis Data Streams

Ejja mmorru fis-servizz Kinesis Data Streams u noħolqu żewġ flussi ġodda, shard wieħed għal kull wieħed.

X'inhu shard?
A shard hija l-unità bażika tat-trasferiment tad-dejta ta 'fluss Amazon Kinesis. Segment wieħed jipprovdi trasferiment tad-dejta tad-dħul b'veloċità ta '1 MB/s u trasferiment tad-dejta tal-ħruġ b'veloċità ta' 2 MB/s. Segment wieħed jappoġġja sa 1000 dħul PUT kull sekonda. Meta toħloq fluss tad-dejta, trid tispeċifika n-numru meħtieġ ta 'segmenti. Pereżempju, tista 'toħloq fluss ta' data b'żewġ segmenti. Dan il-fluss tad-dejta se jipprovdi trasferiment tad-dejta tal-input b'2 MB/s u trasferiment tad-dejta tal-ħruġ f'4 MB/s, li jappoġġja sa 2000 rekord PUT kull sekonda.

Aktar ma jkun hemm frak fil-fluss tiegħek, iktar ikun kbir il-fluss tiegħu. Fil-prinċipju, dan huwa kif il-flussi huma skalati - billi jiżdiedu shards. Imma iktar ma jkollok frak, iktar ikun għoli l-prezz. Kull shard jiswa 1,5 ċenteżmi fis-siegħa u 1.4 ċenteżmi addizzjonali għal kull miljun unità ta 'tagħbija PUT.

Ejja noħolqu fluss ġdid bl-isem biljetti_ajru, shard 1 se jkun biżżejjed għalih:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Issa ejja noħolqu ħajt ieħor bl-isem special_stream:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Setup tal-produttur

Biex tanalizza kompitu, huwa biżżejjed li tuża istanza EC2 regolari bħala produttur tad-dejta. M'għandux għalfejn ikun magna virtwali qawwija u għalja; spot t2.micro se jagħmel tajjeb.

Nota importanti: pereżempju, għandek tuża immaġni - Amazon Linux AMI 2018.03.0, għandha inqas settings biex tniedi malajr l-Aġent Kinesis.

Mur fis-servizz EC2, oħloq magna virtwali ġdida, agħżel l-AMI mixtieqa bit-tip t2.micro, li hija inkluża fil-Livell Ħieles:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Sabiex il-magna virtwali maħluqa ġdida tkun tista' tinteraġixxi mas-servizz Kinesis, trid tingħata d-drittijiet biex tagħmel dan. L-aħjar mod biex tagħmel dan huwa li tassenja Rwol IAM. Għalhekk, fuq l-iskrin tal-Pass 3: Ikkonfigura Dettalji tal-Istanza, għandek tagħżel Oħloq Rwol IAM ġdid:

Il-ħolqien ta' rwol IAM għall-EC2
Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Fit-tieqa li tiftaħ, agħżel li qed noħolqu rwol ġdid għal EC2 u mur fit-taqsima Permessi:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Bl-użu tal-eżempju tat-taħriġ, m'għandniex għalfejn nidħlu fl-intricacies kollha tal-konfigurazzjoni granulari tad-drittijiet tar-riżorsi, għalhekk aħna nagħżlu l-politiki konfigurati minn qabel minn Amazon: AmazonKinesisFullAccess u CloudWatchFullAccess.

Ejja nagħtu xi isem sinifikanti għal dan ir-rwol, pereżempju: EC2-KinesisStreams-FullAccess. Ir-riżultat għandu jkun l-istess kif muri fl-istampa hawn taħt:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Wara li toħloq dan ir-rwol ġdid, tinsiex tehmeż mal-istanza tal-magna virtwali maħluqa:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Aħna ma nbiddlu xejn aktar fuq dan l-iskrin u ngħaddu għat-twieqi li jmiss.

Is-settings tal-hard drive jistgħu jitħallew bħala default, kif ukoll it-tikketti (għalkemm hija prattika tajba li tuża tikketti, għall-inqas agħti isem lill-istanza u indika l-ambjent).

Issa ninsabu fuq il-Pass 6: Ikkonfigura t-tab tal-Grupp tas-Sigurtà, fejn għandek bżonn toħloq waħda ġdida jew tispeċifika l-grupp tas-Sigurtà eżistenti tiegħek, li jippermettilek tikkonnettja permezz ta 'ssh (port 22) mal-istanza. Agħżel Sors -> IP tiegħi hemmhekk u tista 'tniedi l-istanza.

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Hekk kif jaqleb għall-istatus ta 'tħaddim, tista' tipprova tikkonnettja miegħu permezz ta 'ssh.

Biex tkun tista 'taħdem ma' Kinesis Agent, wara li tikkonnettja b'suċċess mal-magna, trid iddaħħal il-kmandi li ġejjin fit-terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ejja noħolqu folder biex issalva t-tweġibiet tal-API:

sudo mkdir /var/log/airline_tickets

Qabel ma tibda l-aġent, trid tikkonfigura l-konfigurazzjoni tiegħu:

sudo vim /etc/aws-kinesis/agent.json

Il-kontenut tal-fajl agent.json għandu jidher bħal dan:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kif jidher mill-fajl tal-konfigurazzjoni, l-aġent se jimmonitorja l-fajls bl-estensjoni .log fid-direttorju /var/log/airline_tickets/, janalizzahom u jittrasferihom fil-fluss airline_tickets.

Nibdew mill-ġdid is-servizz u niżguraw li jkun qed jaħdem:

sudo service aws-kinesis-agent restart

Issa ejja tniżżel l-iskrittura Python li se titlob dejta mill-API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

L-iskrittura api_caller.py jitlob dejta minn Aviasales u jiffranka r-rispons riċevut fid-direttorju li jiskennja l-aġent Kinesis. L-implimentazzjoni ta 'dan l-iskript hija pjuttost standard, hemm klassi TicketsApi, tippermettilek tiġbed l-API b'mod mhux sinkroniku. Aħna ngħaddu header b'token u nitolbu parametri lil din il-klassi:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Biex tittestja s-settings korretti u l-funzjonalità tal-aġent, ejja nittestjaw imexxu l-iskrittura api_caller.py:

sudo ./api_caller.py TOKEN

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
U nħarsu lejn ir-riżultat tax-xogħol fir-reġistri tal-Aġenti u fit-tab tal-Monitoraġġ fil-fluss tad-dejta airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Kif tistgħu taraw, kollox jaħdem u l-Aġent Kinesis jibgħat b'suċċess id-data lill-fluss. Issa ejja kkonfigurat konsumatur.

Twaqqif ta' Kinesis Data Analytics

Ejja ngħaddu għall-komponent ċentrali tas-sistema kollha - oħloq applikazzjoni ġdida f'Kinesis Data Analytics bl-isem kinesis_analytics_airlines_app:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Kinesis Data Analytics jippermettilek li twettaq analiżi tad-dejta f'ħin reali minn Kinesis Streams billi tuża l-lingwa SQL. Huwa servizz ta' autoscaling kompletament (b'differenza Kinesis Streams) li:

  1. jippermettilek toħloq flussi ġodda (Output Stream) ibbażati fuq talbiet għal dejta sors;
  2. jipprovdi fluss bi żbalji li seħħew waqt li l-applikazzjonijiet kienu qed jaħdmu (Error Stream);
  3. tista 'tiddetermina awtomatikament l-iskema tad-dejta tal-input (tista' tiġi definita mill-ġdid manwalment jekk meħtieġ).

Dan mhuwiex servizz irħis - 0.11 USD kull siegħa ta 'xogħol, għalhekk għandek tużah bir-reqqa u tħassarha meta tkun lest.

Ejja nqabbdu l-applikazzjoni mas-sors tad-dejta:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Agħżel il-fluss li se nikkonnettjaw miegħu (airline_tickets):

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Sussegwentement, trid tehmeż Rwol IAM ġdid sabiex l-applikazzjoni tkun tista' taqra mill-fluss u tikteb fil-fluss. Biex tagħmel dan, huwa biżżejjed li ma tbiddel xejn fil-blokk tal-permessi ta 'aċċess:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Issa ejja nitolbu skoperta tal-iskema tad-dejta fil-fluss; biex tagħmel dan, ikklikkja fuq il-buttuna "Skopri l-iskema". Bħala riżultat, ir-rwol tal-IAM se jiġi aġġornat (se jinħoloq wieħed ġdid) u sejra titnieda s-sejbien tal-iskema mid-dejta li tkun diġà waslet fil-fluss:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Issa għandek bżonn tmur għall-editur SQL. Meta tikklikkja fuq din il-buttuna, se tidher tieqa li titlobek tniedi l-applikazzjoni - agħżel dak li trid tniedi:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Daħħal il-mistoqsija sempliċi li ġejja fit-tieqa tal-editur SQL u kklikkja Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

F'databases relazzjonali, inti taħdem ma 'tabelli billi tuża dikjarazzjonijiet INSERT biex iżżid rekords u dikjarazzjoni SELECT biex titlob dejta. Fl-Amazon Kinesis Data Analytics, inti taħdem ma 'flussi (STREAMs) u pompi (PUMPs)—talbiet ta' daħħal kontinwi li jdaħħlu dejta minn fluss wieħed f'applikazzjoni f'fluss ieħor.

Il-mistoqsija SQL ippreżentata hawn fuq tfittex biljetti tal-Aeroflot bi spiża taħt il-ħamest elef rublu. Ir-rekords kollha li jissodisfaw dawn il-kundizzjonijiet se jitpoġġew fil-fluss DESTINATION_SQL_STREAM.

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Fil-blokk tad-Destinazzjoni, agħżel in-nixxiegħa special_stream, u fil-lista drop-down isem tal-fluss fl-applikazzjoni DESTINATION_SQL_STREAM:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Ir-riżultat tal-manipulazzjonijiet kollha għandu jkun xi ħaġa simili għall-istampa hawn taħt:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Ħolqien u abbonament għal suġġett SNS

Mur fis-Servizz ta' Notifika Sempliċi u oħloq suġġett ġdid hemmhekk bl-isem Airlines:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Abbona għal dan is-suġġett u indika n-numru tal-mowbajl li lejh jintbagħtu n-notifiki bl-SMS:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Oħloq tabella f'DynamoDB

Biex taħżen id-dejta mhux ipproċessata mill-fluss airline_tickets tagħhom, ejja noħolqu tabella f'DynamoDB bl-istess isem. Aħna se nużaw record_id bħala ċ-ċavetta primarja:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Ħolqien ta 'kollettur tal-funzjoni lambda

Ejja noħolqu funzjoni lambda msejħa Kollettur, li l-kompitu tagħha se jkun li tistħarreġ il-fluss airline_tickets u, jekk jinstabu rekords ġodda hemmhekk, daħħal dawn ir-rekords fit-tabella DynamoDB. Ovvjament, minbarra d-drittijiet default, din il-lambda għandu jkollha aċċess għall-qari għall-fluss tad-dejta Kinesis u aċċess għall-kitba għal DynamoDB.

Il-ħolqien ta' rwol IAM għall-funzjoni lambda tal-kollettur
L-ewwel, ejja noħolqu rwol IAM ġdid għal-lambda bl-isem Lambda-TicketsProcessingRole:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Għall-eżempju tat-test, il-politiki AmazonKinesisReadOnlyAccess u AmazonDynamoDBFullAccess ikkonfigurati minn qabel huma pjuttost adattati, kif muri fl-istampa hawn taħt:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Din il-lambda għandha tiġi mnedija minn grillu minn Kinesis meta daħliet ġodda jidħlu fl-airline_stream, għalhekk irridu nżidu grillu ġdid:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Li jibqa 'huwa li daħħal il-kodiċi u ssalva l-lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ħolqien ta' notifikatur tal-funzjoni lambda

It-tieni funzjoni lambda, li se timmonitorja t-tieni fluss (special_stream) u tibgħat notifika lill-SNS, hija maħluqa b'mod simili. Għalhekk, din il-lambda għandu jkollha aċċess biex taqra minn Kinesis u tibgħat messaġġi lil suġġett SNS partikolari, li mbagħad jintbagħat mis-servizz SNS lill-abbonati kollha ta 'dan is-suġġett (email, SMS, eċċ.).

Il-ħolqien ta' rwol IAM
L-ewwel, noħolqu r-rwol IAM Lambda-KinesisAlarm għal din il-lambda, u mbagħad nassenja dan ir-rwol lil alarm_notifier lambda li qed tinħoloq:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Din il-lambda għandha taħdem fuq grillu għal rekords ġodda biex jidħlu fis-special_stream, għalhekk għandek bżonn tikkonfigura l-grillu bl-istess mod kif għamilna għal-lambda tal-Kollezzjonisti.

Biex tagħmilha aktar faċli biex tiġi kkonfigurata din il-lambda, ejja nintroduċu varjabbli ambjentali ġdida - TOPIC_ARN, fejn inpoġġu l-ANR (Ismijiet ta' Rikors ta' Amazon) tas-suġġett tal-Ajruplani:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
U daħħal il-kodiċi lambda, mhu kkumplikat xejn:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Jidher li dan huwa fejn titlesta l-konfigurazzjoni manwali tas-sistema. Li jibqa 'huwa li tittestja u niżguraw li kkonfigurajna kollox b'mod korrett.

Uża mill-kodiċi Terraform

Preparazzjoni meħtieġa

Terraform hija għodda open-source konvenjenti ħafna għall-iskjerament tal-infrastruttura mill-kodiċi. Għandha s-sintassi tagħha stess li hija faċli biex titgħallem u għandha ħafna eżempji ta 'kif u x'għandek tuża. L-editur Atom jew Visual Studio Code għandu ħafna plugins utli li jagħmlu l-ħidma ma 'Terraform aktar faċli.

Tista' tniżżel id-distribuzzjoni għalhekk. Analiżi dettaljata tal-kapaċitajiet kollha ta 'Terraform hija lil hinn mill-ambitu ta' dan l-artikolu, għalhekk se nillimitaw lilna nfusna għall-punti ewlenin.

Kif tibda

Il-kodiċi sħiħ tal-proġett huwa fir-repożitorju tiegħi. Aħna nikklonu r-repożitorju lilna nfusna. Qabel ma tibda, trid tiżgura li għandek AWS CLI installat u kkonfigurat, minħabba li... Terraform se tfittex kredenzjali fil-fajl ~/.aws/credentials.

Prattika tajba hija li tħaddem il-kmand tal-pjan qabel ma tuża l-infrastruttura kollha biex tara x'qed joħloq Terraform bħalissa għalina fil-cloud:

terraform.exe plan

Inti se tintalab iddaħħal numru tat-telefon biex tibgħatlu notifiki. Mhux meħtieġ li tidħol fiha f'dan l-istadju.

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Wara li analizzat il-pjan tat-tħaddim tal-programm, nistgħu nibdew noħolqu riżorsi:

terraform.exe apply

Wara li tibgħat dan il-kmand, terġa' tintalab iddaħħal numru tat-telefon; ċemplil "iva" meta tintwera mistoqsija dwar it-twettiq tal-azzjonijiet fil-fatt. Dan jippermettilek li twaqqaf l-infrastruttura kollha, twettaq il-konfigurazzjoni kollha meħtieġa ta 'EC2, tuża funzjonijiet lambda, eċċ.

Wara li r-riżorsi kollha jkunu nħolqu b'suċċess permezz tal-kodiċi Terraform, trid tidħol fid-dettalji tal-applikazzjoni Kinesis Analytics (sfortunatament, ma sibtx kif tagħmel dan direttament mill-kodiċi).

Ibda l-applikazzjoni:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Wara dan, trid issettja b'mod espliċitu l-isem tan-nixxiegħa fl-applikazzjoni billi tagħżel mil-lista li tinżel:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Issa kollox lest biex imur.

Ittestjar tal-applikazzjoni

Irrispettivament minn kif skjerajt is-sistema, manwalment jew permezz tal-kodiċi Terraform, se taħdem l-istess.

Aħna nilloggjaw permezz ta' SSH mal-magna virtwali EC2 fejn ikun installat Kinesis Agent u nħaddmu l-iskrittura api_caller.py

sudo ./api_caller.py TOKEN

Kull ma trid tagħmel hu li tistenna SMS fuq in-numru tiegħek:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
SMS - il-messaġġ jasal fuq it-telefon fi kważi minuta:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless
Għad irid jara jekk ir-rekords ġewx salvati fid-database DynamoDB għal analiżi sussegwenti u aktar dettaljata. It-tabella airline_tickets fiha madwar id-dejta li ġejja:

Aviasales API integrazzjoni ma 'Amazon Kinesis u sempliċità serverless

Konklużjoni

Matul ix-xogħol li sar, inbniet sistema onlajn għall-ipproċessar tad-dejta bbażata fuq Amazon Kinesis. Għażliet għall-użu tal-Aġent Kinesis flimkien ma 'Kinesis Data Streams u analitika f'ħin reali Kinesis Analytics bl-użu ta' kmandi SQL, kif ukoll l-interazzjoni ta 'Amazon Kinesis ma' servizzi AWS oħra ġew ikkunsidrati.

Aħna wżajna s-sistema ta 'hawn fuq b'żewġ modi: waħda manwali pjuttost twila u waħda ta' malajr mill-kodiċi Terraform.

Il-kodiċi tas-sors tal-proġett kollu huwa disponibbli fir-repożitorju GitHub tiegħi, Nissuġġerixxi li tiffamiljarizza ruħek magħha.

Ninsab kuntent li niddiskuti l-artiklu, nistenna bil-ħerqa l-kummenti tiegħek. Nittama għal kritika kostruttiva.

Nixtieqek suċċess!

Sors: www.habr.com

Żid kumment