Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Hæ Habr!

Finnst þér gaman að fljúga flugvélum? Ég elska það, en í sjálfeinangrun varð ég líka ástfanginn af því að greina gögn um flugmiða frá einni vel þekktri auðlind - Aviasales.

Í dag munum við greina vinnu Amazon Kinesis, byggja upp streymiskerfi með rauntímagreiningum, setja upp Amazon DynamoDB NoSQL gagnagrunninn sem aðalgagnageymsluna og setja upp SMS tilkynningar fyrir áhugaverða miða.

Öll smáatriði eru undir skurðinum! Farðu!

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Inngangur

Fyrir dæmið þurfum við aðgang að Aviasales API. Aðgangur að því er veittur ókeypis og án takmarkana; þú þarft bara að skrá þig í hlutanum „Hönnuðir“ til að fá API táknið þitt til að fá aðgang að gögnunum.

Megintilgangur þessarar greinar er að gefa almennan skilning á notkun upplýsingastreymis í AWS; við tökum tillit til þess að gögnin sem skilað er af API sem notað er eru ekki nákvæmlega uppfærð og eru send úr skyndiminni, sem er mynduð byggt á leit notenda á Aviasales.ru og Jetradar.com síðunum síðustu 48 klukkustundir.

Kinesis-umboðsmaður, settur upp á framleiðsluvélinni, móttekinn í gegnum API mun sjálfkrafa þátta og senda gögn í viðkomandi straum í gegnum Kinesis Data Analytics. Hrá útgáfan af þessum straumi verður skrifuð beint í verslunina. Hrágagnageymslan sem notuð er í DynamoDB mun leyfa dýpri miðagreiningu í gegnum BI verkfæri, eins og AWS Quick Sight.

Við munum íhuga tvo valkosti til að dreifa öllu innviði:

  • Handbók - í gegnum AWS Management Console;
  • Innviðir frá Terraform kóða eru fyrir lata sjálfvirka;

Arkitektúr þróaðs kerfis

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Íhlutir notaðir:

  • Aviasales API — gögnin sem þetta API skilar verða notuð fyrir alla síðari vinnu;
  • EC2 framleiðanda dæmi — venjuleg sýndarvél í skýinu þar sem inntaksgagnastraumurinn verður búinn til:
    • Kinesis umboðsmaður er Java forrit uppsett á staðnum á vélinni sem veitir auðvelda leið til að safna og senda gögn til Kinesis (Kinesis Data Streams eða Kinesis Firehose). Umboðsmaðurinn fylgist stöðugt með safni skráa í tilgreindum möppum og sendir ný gögn til Kinesis;
    • API Caller Script — Python forskrift sem gerir beiðnir til API og setur svarið í möppu sem Kinesis umboðsmaðurinn hefur eftirlit með;
  • Kinesis gagnastraumar — rauntíma gagnastraumsþjónusta með víðtækri stærðarmöguleika;
  • Kinesis Analytics er netþjónalaus þjónusta sem einfaldar greiningu á streymigögnum í rauntíma. Amazon Kinesis Data Analytics stillir forritaauðlindir og skalar sjálfkrafa til að meðhöndla hvaða magn af komandi gögnum;
  • AWS Lambda — þjónusta sem gerir þér kleift að keyra kóða án þess að taka öryggisafrit eða setja upp netþjóna. Öll tölvuafl er sjálfkrafa skaluð fyrir hvert símtal;
  • Amazon DynamoDB - Gagnagrunnur með lykilgildapörum og skjölum sem veitir leynd sem er minna en 10 millisekúndur þegar keyrt er á hvaða mælikvarða sem er. Þegar þú notar DynamoDB þarftu ekki að útvega, lagfæra eða stjórna neinum netþjónum. DynamoDB skalar töflur sjálfkrafa til að stilla magn tiltækra auðlinda og viðhalda mikilli afköstum. Engin kerfisstjórnun er nauðsynleg;
  • Amazon SNS - fullstýrð þjónusta til að senda skilaboð með útgefanda-áskrifanda (Pub/Sub) líkaninu, sem þú getur einangrað örþjónustur, dreifð kerfi og netþjónalaus forrit. SNS er hægt að nota til að senda upplýsingar til endanotenda í gegnum farsímatilkynningar, SMS skilaboð og tölvupóst.

Byrjunarþjálfun

Til að líkja eftir gagnaflæðinu ákvað ég að nota flugmiðaupplýsingarnar sem Aviasales API skilaði. IN skjöl nokkuð víðtækur listi yfir mismunandi aðferðir, við skulum taka eina af þeim - „Mánaðarverðadagatal“, sem skilar verði fyrir hvern dag mánaðar, flokkað eftir fjölda flutninga. Ef þú tilgreinir ekki leitarmánuðinn í beiðninni verður upplýsingum skilað fyrir mánuðinn á eftir þeim mánuði sem er í gildi.

Svo, við skulum skrá okkur og fá táknið okkar.

Dæmi um beiðni er hér að neðan:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ofangreind aðferð til að taka á móti gögnum frá API með því að tilgreina tákn í beiðninni mun virka, en ég kýs að senda aðgangslykilinn í gegnum hausinn, svo við munum nota þessa aðferð í api_caller.py forskriftinni.

Svar dæmi:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Dæmi um API svar hér að ofan sýnir miða frá Sankti Pétursborg til Phuk... Ó, hvílíkur draumur...
Þar sem ég er frá Kazan og Phuket er nú „aðeins draumur“ skulum við leita að miðum frá Sankti Pétursborg til Kazan.

Það gerir ráð fyrir að þú sért nú þegar með AWS reikning. Ég vil strax vekja sérstaka athygli á því að Kinesis og sending tilkynninga í gegnum SMS er ekki innifalin í árlegum Ókeypis stig (ókeypis notkun). En þrátt fyrir þetta, með nokkra dollara í huga, er alveg hægt að byggja upp fyrirhugað kerfi og spila með það. Og, auðvitað, ekki gleyma að eyða öllum auðlindum eftir að þeirra er ekki lengur þörf.

Sem betur fer verða DynamoDb og lambda aðgerðir ókeypis fyrir okkur ef við uppfyllum mánaðarleg ókeypis mörk okkar. Til dæmis, fyrir DynamoDB: 25 GB geymslupláss, 25 WCU/RCU og 100 milljón fyrirspurnir. Og milljón lambda virka símtöl á mánuði.

Handvirk uppsetning kerfis

Uppsetning Kinesis gagnastrauma

Við skulum fara í Kinesis Data Streams þjónustuna og búa til tvo nýja strauma, eitt brot fyrir hvern.

Hvað er skarð?
Shard er grunngagnaflutningseining Amazon Kinesis straums. Einn hluti veitir inntaksgagnaflutning á 1 MB/s hraða og úttaksgagnaflutningur á 2 MB/s hraða. Einn hluti styður allt að 1000 PUT færslur á sekúndu. Þegar þú býrð til gagnastraum þarftu að tilgreina nauðsynlegan fjölda hluta. Til dæmis er hægt að búa til gagnastraum með tveimur hlutum. Þessi gagnastraumur mun veita inntaksgagnaflutning á 2 MB/s og útflutningsgagnaflutningi á 4 MB/s, sem styður allt að 2000 PUT færslur á sekúndu.

Því fleiri brot í straumnum þínum, því meiri afköst hans. Í grundvallaratriðum er þetta hvernig flæði er kvarðað - með því að bæta við brotum. En því fleiri brot sem þú hefur, því hærra verð. Hvert brot kostar 1,5 sent á klukkustund og 1.4 sent til viðbótar fyrir hverja milljón PUT-burðareininga.

Búum til nýjan straum með nafninu flugmiðar, 1 skarð mun duga honum:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Nú skulum við búa til annan þráð með nafninu sérstakur_straumur:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Uppsetning framleiðanda

Til að greina verkefni er nóg að nota venjulegt EC2 tilvik sem gagnaframleiðandi. Það þarf ekki að vera öflug, dýr sýndarvél; staðsetning t2.micro mun duga vel.

Mikilvæg athugasemd: til dæmis ættir þú að nota mynd - Amazon Linux AMI 2018.03.0, það hefur færri stillingar til að ræsa Kinesis Agent fljótt.

Farðu í EC2 þjónustuna, búðu til nýja sýndarvél, veldu viðeigandi AMI með gerð t2.micro, sem er innifalinn í ókeypis stiginu:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Til þess að nýstofnaða sýndarvélin geti haft samskipti við Kinesis þjónustuna verður hún að fá réttindi til þess. Besta leiðin til að gera þetta er að úthluta IAM hlutverki. Þess vegna ættir þú að velja á skjánum Skref 3: Stilla upplýsingar um tilvik Búðu til nýtt IAM hlutverk:

Að búa til IAM hlutverk fyrir EC2
Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Í glugganum sem opnast velurðu að við séum að búa til nýtt hlutverk fyrir EC2 og farðu í heimildahlutann:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Með því að nota þjálfunardæmið þurfum við ekki að fara í allar ranghala nákvæmar uppsetningar á auðlindaréttindum, svo við veljum reglurnar sem Amazon hefur fyrirfram stillt: AmazonKinesisFullAccess og CloudWatchFullAccess.

Við skulum gefa þessu hlutverki eitthvað þýðingarmikið nafn, til dæmis: EC2-KinesisStreams-FullAccess. Niðurstaðan ætti að vera sú sama og sést á myndinni hér að neðan:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Eftir að hafa búið til þetta nýja hlutverk, ekki gleyma að hengja það við búið til sýndarvélartilvik:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Við breytum engu öðru á þessum skjá og förum yfir í næstu glugga.

Hægt er að hafa stillingar á harða disknum sem sjálfgefnar, sem og merki (þó að það sé góð venja að nota merki, að minnsta kosti gefa tilvikinu nafn og gefa til kynna umhverfið).

Nú erum við á skrefi 6: Stilla öryggishóp flipann, þar sem þú þarft að búa til nýjan eða tilgreina núverandi öryggishóp, sem gerir þér kleift að tengjast í gegnum ssh (gátt 22) við tilvikið. Veldu Source -> My IP þar og þú getur ræst dæmið.

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Um leið og það skiptir yfir í hlaupandi stöðu geturðu reynt að tengjast því í gegnum ssh.

Til að geta unnið með Kinesis Agent, eftir að hafa tengst vel við vélina, verður þú að slá inn eftirfarandi skipanir í flugstöðinni:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Við skulum búa til möppu til að vista API svör:

sudo mkdir /var/log/airline_tickets

Áður en þú byrjar umboðsmanninn þarftu að stilla stillingar hans:

sudo vim /etc/aws-kinesis/agent.json

Innihald agent.json skrárinnar ætti að líta svona út:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Eins og sést á stillingarskránni mun umboðsmaðurinn fylgjast með skrám með .log endingunni í /var/log/airline_tickets/ skránni, flokka þær og flytja þær yfir á airline_tickets strauminn.

Við endurræsum þjónustuna og tryggjum að hún sé í gangi:

sudo service aws-kinesis-agent restart

Nú skulum við hlaða niður Python handritinu sem mun biðja um gögn frá API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py handritið biður um gögn frá Aviasales og vistar móttekið svar í möppunni sem Kinesis umboðsmaðurinn skannar. Framkvæmd þessa handrits er alveg staðlað, það er TicketsApi flokkur, það gerir þér kleift að draga API ósamstillt. Við sendum haus með tákni og biðjum um færibreytur í þennan flokk:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Til að prófa réttar stillingar og virkni umboðsmannsins skulum við prófa api_caller.py forskriftina:

sudo ./api_caller.py TOKEN

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Og við skoðum niðurstöðu vinnunnar í umboðsskrám og á flipanum Vöktun í gagnastraumi flugmiða:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Eins og þú sérð virkar allt og Kinesis Agent sendir gögn í strauminn. Nú skulum við stilla neytendur.

Uppsetning Kinesis Data Analytics

Við skulum halda áfram að miðlæga hluta alls kerfisins - búðu til nýtt forrit í Kinesis Data Analytics sem heitir kinesis_analytics_airlines_app:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Kinesis Data Analytics gerir þér kleift að framkvæma rauntíma gagnagreiningu frá Kinesis Streams með því að nota SQL tungumálið. Það er fullkomlega sjálfstýrð þjónusta (ólíkt Kinesis Streams) sem:

  1. gerir þér kleift að búa til nýja strauma (Output Stream) byggt á beiðnum um að fá gögn;
  2. veitir straum villur sem komu upp á meðan forrit voru í gangi (Villustraumur);
  3. getur sjálfkrafa ákvarðað inntaksgagnakerfið (hægt að endurskilgreina það handvirkt ef þörf krefur).

Þetta er ekki ódýr þjónusta - 0.11 USD á vinnustund, svo þú ættir að nota hana varlega og eyða henni þegar þú ert búinn.

Við skulum tengja forritið við gagnagjafann:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Veldu strauminn sem við ætlum að tengjast (airline_tickets):

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Næst þarftu að hengja nýtt IAM hlutverk svo að forritið geti lesið úr straumnum og skrifað í strauminn. Til að gera þetta er nóg að breyta engu í aðgangsheimildablokkinni:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Nú skulum við biðja um uppgötvun á gagnastefinu í straumnum; til að gera þetta, smelltu á hnappinn „Uppgötvaðu skema“. Fyrir vikið verður IAM hlutverkið uppfært (nýtt verður búið til) og skemaskynjun verður ræst úr gögnunum sem þegar hafa borist í strauminn:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Nú þarftu að fara í SQL ritilinn. Þegar þú smellir á þennan hnapp birtist gluggi sem biður þig um að ræsa forritið - veldu það sem þú vilt opna:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Settu eftirfarandi einfalda fyrirspurn inn í SQL ritstjóragluggann og smelltu á Vista og keyra SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Í venslagagnagrunnum er unnið með töflur með því að nota INSERT setningar til að bæta við færslum og SELECT setningu til að spyrjast fyrir um gögn. Í Amazon Kinesis Data Analytics vinnurðu með strauma (STREAM) og dælur (PUMPs) — stöðugar innsetningarbeiðnir sem setja gögn úr einum straumi í forriti í annan straum.

SQL fyrirspurnin sem kynnt er hér að ofan leitar að Aeroflot miðum á kostnaði undir fimm þúsund rúblum. Allar færslur sem uppfylla þessi skilyrði verða settar í DESTINATION_SQL_STREAM strauminn.

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Í áfangastaðnum skaltu velja special_stream strauminn og í fellilistanum DESTINATION_SQL_STREAM heiti straums í forriti:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Niðurstaðan af öllum meðhöndlun ætti að vera eitthvað svipað og á myndinni hér að neðan:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Að búa til og gerast áskrifandi að SNS efni

Farðu í Simple Notification Service og búðu til nýtt umræðuefni þar með nafninu Flugfélög:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Gerast áskrifandi að þessu efni og tilgreinið farsímanúmerið sem SMS-tilkynningar verða sendar til:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Búðu til töflu í DynamoDB

Til að geyma hrá gögnin úr flugmiðastraumnum þeirra skulum við búa til töflu í DynamoDB með sama nafni. Við munum nota record_id sem aðallykil:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Að búa til lambda virkni safnara

Við skulum búa til lambda fall sem kallast Collector, sem hefur það verkefni að skoða flugmiðastrauminn og, ef nýjar færslur finnast þar, setja þessar færslur inn í DynamoDB töfluna. Augljóslega, auk sjálfgefna réttinda, verður þessi lambda að hafa lesaðgang að Kinesis gagnastraumnum og skrifa aðgang að DynamoDB.

Að búa til IAM hlutverk fyrir safnara lambda aðgerðina
Fyrst skulum við búa til nýtt IAM hlutverk fyrir lambda sem heitir Lambda-TicketsProcessingRole:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Fyrir prófunardæmið eru forstilltu AmazonKinesisReadOnlyAccess og AmazonDynamoDBFullAccess reglurnar mjög hentugar, eins og sýnt er á myndinni hér að neðan:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Þessi lambda ætti að koma af stað með kveikju frá Kinesis þegar nýjar færslur koma inn í flugfélagsstrauminn, svo við þurfum að bæta við nýjum kveikju:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Það eina sem er eftir er að setja kóðann inn og vista lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Að búa til tilkynnanda um lambda virkni

Önnur lambda aðgerðin, sem mun fylgjast með öðrum straumnum (special_stream) og senda tilkynningu til SNS, er búin til á svipaðan hátt. Þess vegna verður þessi lambda að hafa aðgang að því að lesa úr Kinesis og senda skilaboð í tiltekið SNS efni, sem síðan verður sent af SNS þjónustunni til allra áskrifenda þessa efnis (tölvupóstur, SMS, osfrv.).

Að búa til IAM hlutverk
Fyrst búum við til IAM hlutverkið Lambda-KinesisAlarm fyrir þessa lambda, og úthlutum síðan þessu hlutverki við alarm_notifier lambda sem verið er að búa til:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Þessi lambda ætti að virka á kveikju fyrir nýjar færslur til að komast inn í special_stream, svo þú þarft að stilla kveikjuna á sama hátt og við gerðum fyrir Collector lambda.

Til að gera það auðveldara að stilla þessa lambda, skulum við kynna nýja umhverfisbreytu - TOPIC_ARN, þar sem við setjum ANR (Amazon Recourse Names) fyrir Air topic:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Og settu inn lambda kóðann, það er alls ekki flókið:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Svo virðist sem þetta sé þar sem handvirkri kerfisstillingu er lokið. Allt sem er eftir er að prófa og ganga úr skugga um að við höfum stillt allt rétt.

Dreifa frá Terraform kóða

Nauðsynlegur undirbúningur

Terraform er mjög þægilegt opinn uppspretta tól til að dreifa innviðum frá kóða. Það hefur sína eigin setningafræði sem auðvelt er að læra og hefur mörg dæmi um hvernig og hvað á að nota. Atom ritstjórinn eða Visual Studio Code hefur margar handhægar viðbætur sem auðvelda vinnu með Terraform.

Hægt er að hlaða niður dreifingunni þess vegna. Ítarleg greining á öllum Terraform getu er utan gildissviðs þessarar greinar, svo við munum takmarka okkur við aðalatriðin.

Hvernig á að hlaupa

Heildarkóði verkefnisins er í geymslunni minni. Við klónum geymsluna til okkar. Áður en þú byrjar þarftu að ganga úr skugga um að þú hafir AWS CLI uppsett og stillt, vegna þess að... Terraform mun leita að skilríkjum í ~/.aws/credentials skránni.

Góð æfing er að keyra áætlunarskipunina áður en þú setur upp allan innviðina til að sjá hvað Terraform er að búa til fyrir okkur í skýinu:

terraform.exe plan

Þú verður beðinn um að slá inn símanúmer til að senda tilkynningar á. Það er ekki nauðsynlegt að slá það inn á þessu stigi.

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Eftir að hafa greint rekstraráætlun áætlunarinnar getum við byrjað að búa til auðlindir:

terraform.exe apply

Eftir að þú sendir þessa skipun verðurðu aftur beðinn um að slá inn símanúmer; hringdu í „já“ þegar spurning um raunverulega framkvæmd aðgerðanna birtist. Þetta gerir þér kleift að setja upp alla innviði, framkvæma allar nauðsynlegar stillingar á EC2, setja upp lambda-aðgerðir osfrv.

Eftir að öll tilföng hafa verið búin til með Terraform kóðanum þarftu að fara í smáatriðin í Kinesis Analytics forritinu (því miður fann ég ekki hvernig á að gera þetta beint úr kóðanum).

Ræstu forritið:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Eftir þetta verður þú að stilla beinlínis heiti straumsins í forritinu með því að velja úr fellilistanum:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Nú er allt tilbúið til að fara.

Að prófa forritið

Óháð því hvernig þú settir kerfið upp, handvirkt eða í gegnum Terraform kóða, mun það virka eins.

Við skráum okkur inn í gegnum SSH á EC2 sýndarvélina þar sem Kinesis Agent er sett upp og keyrum api_caller.py forskriftina

sudo ./api_caller.py TOKEN

Allt sem þú þarft að gera er að bíða eftir SMS í númerið þitt:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
SMS - skilaboðin berast í símann eftir tæpa 1 mínútu:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika
Það á eftir að sjá hvort færslurnar hafi verið vistaðar í DynamoDB gagnagrunninum fyrir síðari, ítarlegri greiningu. Taflan flugmiða inniheldur um það bil eftirfarandi gögn:

Aviasales API samþætting við Amazon Kinesis og netþjónalausan einfaldleika

Ályktun

Sem hluti af vinnunni var smíðað gagnavinnslukerfi á netinu byggt á Amazon Kinesis. Valkostir til að nota Kinesis Agent í tengslum við Kinesis Data Streams og rauntíma greiningar Kinesis Analytics með því að nota SQL skipanir, sem og samspil Amazon Kinesis við aðra AWS þjónustu voru skoðaðir.

Við notuðum ofangreint kerfi á tvo vegu: frekar langt handvirkt og fljótlegt frá Terraform kóðanum.

Allur frumkóði verkefnisins er fáanlegur í GitHub geymslunni minni, ég mæli með að þú kynnir þér það.

Ég er fús til að ræða greinina, ég bíð spenntur eftir athugasemdum þínum. Ég vonast eftir uppbyggilegri gagnrýni.

Ég óska ​​þér velgengni!

Heimild: www.habr.com

Bæta við athugasemd