Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Haai Habr!

Hou jy daarvan om op vliegtuie te vlieg? Ek is mal daaroor, maar tydens selfisolasie het ek ook verlief geraak op die ontleding van data oor vliegkaartjies van een bekende hulpbron – Aviasales.

Vandag sal ons die werk van Amazon Kinesis ontleed, 'n stroomstelsel bou met intydse analise, die Amazon DynamoDB NoSQL-databasis installeer as die hoofdatawinkel, en kennisgewing per SMS opstel vir interessante kaartjies.

Al die besonderhede onder die snit! Gaan!

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Inleiding

Ons het byvoorbeeld toegang nodig tot Aviasales API. Toegang tot dit word gratis verskaf en sonder beperkings, jy hoef net in die "Ontwikkelaars"-afdeling te registreer om jou API-token te kry om toegang tot data te verkry.

Die hoofdoel van hierdie artikel is om 'n algemene begrip te gee van die gebruik van stroominligting in AWS, ons haal dit uit die hakies dat die data wat deur die gebruikte API teruggestuur word nie streng op datum is nie en vanaf die kas versend word , wat gevorm word op grond van soektogte deur gebruikers van die Aviasales.ru en Jetradar.com webwerwe vir die afgelope 48 uur.

Die Kinesis-agent wat op die vervaardigermasjien geïnstalleer is, ontvang via die API, sal outomaties die data op die kaartjies ontleed en oordra na die vereiste stroom via Kinesis Data Analytics. Die rou weergawe van hierdie stroom sal direk na die bewaarplek geskryf word. Die stoor van rou data wat in DynamoDB ontplooi word, sal dieper ontleding van kaartjies moontlik maak deur BI-nutsgoed, soos AWS Quick Sight.

Ons sal twee opsies oorweeg om die hele infrastruktuur te ontplooi:

  • Handleiding - via AWS Management Console;
  • Infrastruktuur van die Terraform-kode - vir lui outomateerders;

Die argitektuur van die ontwikkelde stelsel

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Gebruikte komponente:

  • Aviasales API - die data wat deur hierdie API teruggestuur word, sal vir alle daaropvolgende werk gebruik word;
  • EC2 Producer Instance - 'n gewone virtuele masjien in die wolk, waarop die insetdatastroom gegenereer sal word:
    • Kinesis Agent is 'n Java-toepassing wat plaaslik op 'n masjien geïnstalleer is wat 'n maklike manier bied om data te versamel en na Kinesis (Kinesis Data Streams of Kinesis Firehose) te stuur. Die agent monitor voortdurend 'n stel lêers in die gespesifiseerde dopgehou en stuur nuwe data na Kinesis;
    • API-oproeperskrif - 'n Python-skrip wat versoeke aan die API rig en die reaksie by 'n gids voeg wat deur die Kinesis Agent gemonitor word;
  • Kinesis Datastrome — Intydse stroomdiens met hoë skaalbaarheid;
  • Kinesis Analytics is 'n bedienerlose diens wat die ontleding van stroomdata in reële tyd vergemaklik. Amazon Kinesis Data Analytics konfigureer toepassingshulpbronne en skaal outomaties om enige volume inkomende data te hanteer;
  • AWS Lambda - 'n diens waarmee u kode kan laat loop sonder oortolligheid en bedienerkonfigurasie. Alle rekenaarkrag word outomaties vir elke oproep afgeskaal;
  • Amazon DynamoDB - 'n databasis van sleutel-waarde-pare en dokumente wat 'n vertraging van minder as 10 millisekondes bied wanneer daar op enige skaal gewerk word. Met DynamoDB hoef jy nie enige bedieners te voorsien, reg te maak of te bestuur nie. DynamoDB skaal outomaties tabelle om aan te pas vir beskikbare hulpbronne terwyl hoë werkverrigting gehandhaaf word. Geen stelseladministrasie word vereis nie;
  • Amazon SNS is 'n volledig bestuurde publiseer-inteken (Pub/Sub) boodskapdiens wat mikrodienste, verspreide stelsels en bedienerlose toepassings kan isoleer. SNS kan gebruik word om inligting aan eindgebruikers te versprei deur mobiele stootkennisgewings, SMS-boodskappe en e-posse.

Aanvanklike opleiding

Om die datavloei na te boots, het ek besluit om die vliegkaartjieinligting te gebruik wat deur die Aviasales API teruggestuur word. IN dokumentasie nogal 'n uitgebreide lys van verskillende metodes, kom ons neem een ​​van hulle - "Maandelikse Pryskalender", wat pryse vir elke dag van die maand gee, gegroepeer volgens die aantal oordragte. As jy nie die maand van die soektog in die navraag stuur nie, sal die inligting vir die maand wat volg op die huidige een teruggestuur word.

So, registreer, kry jou teken.

'n Voorbeeldversoek is hieronder:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Die bogenoemde metode om data van die API af te kry met die teken in die versoek sal werk, maar ek verkies om die toegangstoken deur die kop te stuur, so ons sal hierdie metode in die api_caller.py-skrip gebruik.

Antwoord voorbeeld:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Die voorbeeld API-reaksie hierbo wys 'n kaartjie van St. Petersburg na Phuk ... O, wat 'n droom ...
Aangesien ek van Kazan is, en Phuket nou "ons droom net" is, sal ons kaartjies van St. Petersburg na Kazan soek.

Dit neem aan dat u reeds 'n AWS-rekening het. Ek wil dadelik spesiale aandag gee aan die feit dat Kinesis en die stuur van kennisgewings per SMS nie by die jaarlikse ingesluit is nie Gratis vlak (gratis om te gebruik). Maar tog, met 'n paar dollar in gedagte, is dit heel moontlik om die voorgestelde stelsel te bou en daarmee te speel. En, natuurlik, moenie vergeet om alle hulpbronne uit te vee nadat dit nie meer nodig is nie.

Gelukkig sal DynamoDb- en lambda-funksies vir ons deelware wees, solank ons ​​aan die maandelikse gratis limiete voldoen. Byvoorbeeld, vir DynamoDB: 25 GB berging, 25 WCU/RCU en 100 miljoen navrae. En 'n miljoen lambda-funksie-oproepe per maand.

Handmatige stelselontplooiing

Die opstel van Kinesis-datastrome

Kom ons gaan na die Kinesis Data Streams-diens en skep twee nuwe strome, een skerf elk.

Wat is 'n skerf?
'n Skerf is die basiese data-oordrageenheid van 'n Amazon Kinesis-stroom. Een segment verskaf 1 MB/s invoer en 2 MB/s uitvoer. Een segment ondersteun tot 1000 PUT-rekords per sekonde. Wanneer u 'n datavloei skep, moet u die verlangde aantal segmente spesifiseer. Byvoorbeeld, jy kan 'n datastroom met twee segmente skep. Hierdie datastroom sal 2 MB/s invoer en 4 MB/s uitvoer verskaf, wat tot 2000 PUTs per sekonde ondersteun.

Hoe meer skerwe in jou stroom, hoe groter is die deurset daarvan. In beginsel is dit hoe vloeie afgeskaal word – deur skerwe by te voeg. Maar hoe meer skerwe jy het, hoe hoër is die prys. Elke skerf kos 1,5 sent per uur en 'n bykomende 1.4 sent vir elke miljoen PUT-loonvrag-eenhede.

Kom ons skep 'n nuwe draad met die naam vliegtuigkaartjies, 1 skerf sal vir hom genoeg wees:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Kom ons skep nou 'n ander draad met die naam spesiale_stroom:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Vervaardiger opstelling

Dit is genoeg om 'n gewone EC2-instansie as 'n datavervaardiger te gebruik om die taak te ontleed. Dit hoef nie 'n kragtige, duur VM te wees nie, 'n spot t2.micro is goed.

Belangrike nota: jy moet byvoorbeeld beeld gebruik - Amazon Linux AMI 2018.03.0, met minder instellings om Kinesis Agent vinnig te begin.

Ons gaan na die EC2-diens, skep 'n nuwe virtuele masjien, kies die gewenste AMI met die t2.micro-tipe, wat ingesluit is in die Free Tier:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Om die nuutgeskepte virtuele masjien met die Kinesis-diens te kan kommunikeer, moet dit toestemming gegee word om dit te doen. Die beste manier om dit te doen is om 'n IAM-rol toe te wys. Daarom, op die Stap 3: Stel instansiebesonderhede skerm, kies Skep nuwe IAM-rol:

Skep 'n IAM-rol vir EC2
Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Kies in die venster wat oopmaak dat ons 'n nuwe rol vir EC2 skep en gaan na die Toestemmings-afdeling:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Deur die opleidingsvoorbeeld te gebruik, kan u nie ingaan op al die ingewikkeldhede van korrelkonfigurasie van hulpbronregte nie, daarom sal ons die beleide kies wat vooraf deur Amazon gekonfigureer is: AmazonKinesisFullAccess en CloudWatchFullAccess.

Kom ons gee 'n betekenisvolle naam vir hierdie rol, byvoorbeeld: EC2-KinesisStreams-FullAccess. As gevolg hiervan, moet jy dieselfde kry as in die prentjie hieronder:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Nadat u hierdie nuwe rol geskep het, moenie vergeet om dit aan die geskepde virtuele masjien-instansie te heg nie:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Ons verander niks anders op hierdie skerm nie en gaan aan na die volgende vensters.

Jy kan die hardeskyfparameters as verstek laat, etikette ook (alhoewel dit goeie praktyk is om etikette te gebruik, noem ten minste die instansie en spesifiseer die omgewing).

Nou is ons op die Stap 6: Stel Sekuriteitsgroep-oortjie in, waar jy 'n nuwe een moet skep of die Sekuriteitsgroep moet spesifiseer wat jy het, wat jou toelaat om via ssh (poort 22) aan die instansie te koppel. Kies Bron -> My IP daar en jy kan die instansie begin.

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Sodra dit verander na die lopende status, kan jy probeer om dit via ssh te koppel.

Om met Kinesis Agent te kan werk, nadat u suksesvol aan die masjien gekoppel het, moet u die volgende opdragte in die terminaal invoer:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Kom ons skep 'n gids om die API-antwoorde te stoor:

sudo mkdir /var/log/airline_tickets

Voordat u die agent begin, moet u die konfigurasie daarvan instel:

sudo vim /etc/aws-kinesis/agent.json

Die inhoud van die agent.json-lêer moet soos volg lyk:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Soos gesien kan word uit die konfigurasielêer, sal die agent lêers met die .log-uitbreiding in die /var/log/airline_tickets/-gids monitor, dit ontleed en na die airline_tickets-stroom deurgee.

Herbegin die diens en maak seker dat dit aan die gang is:

sudo service aws-kinesis-agent restart

Kom ons laai nou 'n Python-skrip af wat data van die API sal aanvra:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Die api_caller.py-skrip versoek data van Aviasales en stoor die ontvangde antwoord in die gids wat die Kinesis-agent skandeer. Die implementering van hierdie skrif is redelik standaard, daar is 'n TicketsApi-klas, dit laat jou toe om die API asynchronies te trek. Ons gee die kopskrif met die teken en versoek parameters na hierdie klas:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Om die korrektheid van die instellings en die werkbaarheid van die agent te toets, sal ons die api_caller.py-skrip toets:

sudo ./api_caller.py TOKEN

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
En ons kyk na die resultaat van werk in die Agent-logboeke en op die Monitering-oortjie in die airline_tickets-datastroom:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Soos u kan sien, werk alles en Kinesis Agent stuur data suksesvol na die stroom. Kom ons stel nou die verbruiker op.

Die opstel van Kinesis Data Analytics

Kom ons gaan aan na die sentrale komponent van die hele stelsel - kom ons skep 'n nuwe toepassing in Kinesis Data Analytics genaamd kinesis_analytics_airlines_app:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Kinesis Data Analytics laat jou toe om intydse data-analise uit Kinesis Streams uit te voer met behulp van die SQL-taal. Dit is 'n volledig outoskaaldiens (anders as Kinesis Streams) wat:

  1. laat jou toe om nuwe strome (Uitvoerstroom) te skep gebaseer op navrae na die brondata;
  2. verskaf 'n stroom met foute wat tydens die werking van toepassings plaasgevind het (Foutstroom);
  3. kan outomaties die skema van die invoerdata bepaal (dit kan met die hand oorskryf word indien nodig).

Dit is 'n duur diens - 0.11 USD per uur se werk, so jy moet dit versigtig gebruik en dit uitvee wanneer jy klaar is met werk.

Kom ons koppel die toepassing aan die databron:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Kies die stroom waaraan ons gaan koppel (airline_tickets):

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Vervolgens moet jy 'n nuwe IAM-rol aanheg sodat die toepassing uit die stroom kan lees en na die stroom kan skryf. Om dit te doen, is dit genoeg om niks in die Toegangstoestemmingsblok te verander nie:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Nou sal ons die ontdekking van die dataskema in die stroom versoek, hiervoor klik ons ​​op die "Ontdek skema" -knoppie. As gevolg hiervan sal die IAM-rol opgedateer (geskep) word en skema-ontdekking sal begin word vanaf die data wat reeds in die stroom aangekom het:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Nou moet jy na die SQL-redigeerder gaan. Wanneer jy op hierdie knoppie klik, sal 'n venster verskyn met 'n vraag oor die bekendstelling van die toepassing - kies wat ons wil begin:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Voeg die volgende eenvoudige navraag in die SQL-redigeerdervenster in en klik Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

In relasionele databasisse werk jy met tabelle deur INSERT-stellings te gebruik om rekords by te voeg en SELECT-stellings om data te bevraagteken. In Amazon Kinesis Data Analytics werk jy met strome (STREAM) en "pompe" (PUMP's), deurlopende invoegnavrae wat data van een stroom in jou toepassing in 'n ander stroom invoeg.

Die bogenoemde SQL-navraag soek na Aeroflot-kaartjies teen 'n koste van minder as vyfduisend roebels. Alle rekords wat by hierdie voorwaardes pas, sal op die DESTINATION_SQL_STREAM-stroom geplaas word.

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
In die Bestemming-blok, kies die spesiale_stroomstroom, en in die In-toepassing stroomnaam aftreklys DESTINATION_SQL_STREAM:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
As gevolg van al die manipulasies, moet jy iets kry soortgelyk aan die prentjie hieronder:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Skep en teken in op 'n SNS-onderwerp

Gaan na die Simple Notification Service en skep 'n nuwe onderwerp daar met die naam Airlines:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Ons teken in op hierdie onderwerp, daarin dui ons die selfoonnommer aan waarna SMS-kennisgewings gestuur sal word:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Skep 'n tabel in DynamoDB

Om die rou data van die airline_tickets-stroom te stoor, kom ons skep 'n tabel in DynamoDB met dieselfde naam. Ons sal record_id as die primêre sleutel gebruik:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Die skep van 'n lambda funksie versamelaar

Kom ons skep 'n lambda-funksie genaamd Collector, wie se taak is om die airline_tickets-stroom te poll en, indien nuwe rekords daar gevind word, hierdie rekords in die DynamoDB-tabel in te voeg. Uiteraard moet hierdie lambda, benewens die verstektoestemmings, toegang hê om die Kinesis-datastroom te lees en na DynamoDB te skryf.

Skep 'n IAM-rol vir die versamelaar lambda-funksie
Kom ons skep eers 'n nuwe IAM lambda-rol genaamd Lambda-TicketsProcessingRole:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Vir 'n toetsvoorbeeld is die vooraf gekonfigureerde AmazonKinesisReadOnlyAccess- en AmazonDynamoDBFullAccess-beleide redelik geskik, soos in die prentjie hieronder getoon:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Hierdie lambda behoort deur Kinesis geaktiveer te word wanneer nuwe inskrywings die airline_stream-stroom binnegaan, so ons moet 'n nuwe sneller byvoeg:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Dit bly om die kode te plak en die lambda te stoor.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Skep 'n lambda-funksie-kennisgewer

Die tweede lambda-funksie, wat die tweede stroom (spesiale_stroom) sal monitor en 'n kennisgewing aan die SNS sal stuur, word op dieselfde manier geskep. Daarom moet hierdie lambda toegang hê om vanaf Kinesis te lees en boodskappe na 'n gegewe SNS-onderwerp te stuur, wat dan deur die SNS-diens aan alle intekenare van hierdie onderwerp (e-pos, SMS, ens.) gestuur sal word.

Skep 'n IAM-rol
Eerstens skep ons die Lambda-KinesisAlarm IAM-rol vir hierdie lambda, en dan ken ons hierdie rol toe aan die geskepte alarm_notifier lambda:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Hierdie lambda moet op 'n sneller werk vir nuwe rekords om die special_stream stroom te betree, so jy moet die sneller op dieselfde manier konfigureer as wat ons vir die Collector lambda gedoen het.

Vir die gerief om hierdie lambda op te stel, kom ons stel 'n nuwe omgewingsveranderlike bekend - TOPIC_ARN, waar ons die ANR (Amazon Recourse Names) van die Lugdiens-onderwerp plaas:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
En ons voeg die lambda-kode in, dit is redelik eenvoudig:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Dit blyk dat hierdie handmatige konfigurasie van die stelsel voltooi is. Dit bly net om te toets en seker te maak dat ons alles korrek opstel.

Ontplooi vanaf Terraform-kode

Vereiste voorbereiding

terraform - 'n baie gerieflike oopbron-instrument vir die implementering van infrastruktuur vanaf kode. Dit het sy eie sintaksis, wat maklik is om te leer en baie voorbeelde van hoe en wat om te ontplooi. Daar is baie handige inproppe in die Atom-redigeerder of Visual Studio Code wat dit makliker maak om met Terraform te werk.

Verspreider kan afgelaai word vandaar. 'N Gedetailleerde ontleding van al die kenmerke van Terraform is buite die bestek van hierdie artikel, so ons sal ons beperk tot die hoofpunte.

Hoe om te begin

Die volledige projekkode is in my bewaarplek. Ons kloon ons bewaarplek. Voordat jy begin, moet jy seker maak dat jy AWS CLI geïnstalleer en gekonfigureer het, want. Terraform sal na geloofsbriewe in die ~/.aws/credentials-lêer soek.

Dit is goeie praktyk om die planopdrag uit te voer voordat die hele infrastruktuur ontplooi word om te sien wat Terraform tans vir ons in die wolk skep:

terraform.exe plan

Jy sal gevra word om 'n telefoonnommer in te voer om kennisgewings na te stuur. Op hierdie stadium is dit nie nodig om dit in te voer nie.

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Nadat ons die programwerkplan ontleed het, kan ons begin om hulpbronne te skep:

terraform.exe apply

Nadat u hierdie opdrag gestuur het, sal u weer gevra word om 'n telefoonnommer in te voer, tik "ja" wanneer die vraag oor die werklike uitvoering van aksies vertoon word. Dit sal jou toelaat om die hele infrastruktuur te verhoog, al die nodige konfigurasie van EC2 uit te voer, lambda-funksies te ontplooi, ens.

Nadat alle hulpbronne suksesvol deur die Terraform-kode geskep is, moet u die besonderhede van die Kinesis Analytics-toepassing ingaan (ongelukkig het ek nie gevind hoe om dit direk vanaf die kode te doen nie).

Ons begin die toepassing:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Daarna moet u die naam in die toepassingstroom uitdruklik stel deur uit die aftreklys te kies:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Nou is alles gereed om te gaan.

Toepassingstoetsing

Ongeag hoe jy die stelsel ontplooi het, met die hand of deur die Terraform-kode, sal dit op dieselfde manier werk.

Ons gaan via SSH na die EC2 virtuele masjien waar Kinesis Agent geïnstalleer is en hardloop die script api_caller.py

sudo ./api_caller.py TOKEN

Dit bly om te wag vir 'n SMS na jou nommer:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
SMS - 'n boodskap kom binne byna 1 minuut op die telefoon:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Dit moet nog gesien word of die rekords in die DynamoDB-databasis bewaar word vir daaropvolgende, meer gedetailleerde ontleding. Die vliegtuigkaartjies-tabel bevat iets soos hierdie:

Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud

Gevolgtrekking

In die loop van die werk wat gedoen is, is 'n aanlyn dataverwerkingstelsel gebaseer op Amazon Kinesis gebou. Die opsies vir die gebruik van Kinesis Agent in samewerking met Kinesis Datastrome en intydse analise van Kinesis Analytics deur gebruik te maak van SQL-opdragte, sowel as die interaksie van Amazon Kinesis met ander AWS-dienste, is oorweeg.

Ons het die bogenoemde stelsel op twee maniere ontplooi: 'n taamlik lang handleiding en 'n vinnige een vanaf die Terraform-kode.

Alle projekbronkode is beskikbaar in my GitHub-bewaarplekEk stel voor jy kyk daarna.

Ek bespreek graag die artikel, ek sien uit na u kommentaar. Hoop vir opbouende kritiek.

Ek wens jou sukses toe!

Bron: will.com

Voeg 'n opmerking