Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

Hoi Habr!

Hâldsto fan fleanende fleantugen? Ik hâld derfan, mar tidens selsisolaasje rekke ik ek fereale op it analysearjen fan gegevens oer fleankaarten fan ien bekende boarne - Aviasales.

Hjoed sille wy analysearje it wurk fan Amazon Kinesis, bouwe in streaming systeem mei real-time analytics, ynstallearje de Amazon DynamoDB NoSQL databank as de wichtichste gegevens opslach, en set SMS notifikaasjes foar nijsgjirrige tickets.

Alle details binne ûnder de besuniging! Go!

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

Ynlieding

Foar it foarbyld, wy moatte tagong ta Aviasales API. Tagong ta it wurdt fergees en sûnder beheiningen levere; jo moatte gewoan registrearje yn 'e seksje "ûntwikkelders" om jo API-token te ûntfangen om tagong te krijen ta de gegevens.

It haaddoel fan dit artikel is om in algemien begryp te jaan fan it gebrûk fan streaming fan ynformaasje yn AWS; wy nimme rekken mei dat de gegevens weromjûn troch de brûkte API net strikt aktueel binne en wurde oerdroegen fanút it cache, dat is foarme basearre op sykopdrachten troch brûkers fan de Aviasales.ru en Jetradar.com siden foar de lêste 48 oeren.

Kinesis-agent, ynstalleare op 'e produsearjende masine, ûntfongen fia de API sil automatysk gegevens analysearje en ferstjoere nei de winske stream fia Kinesis Data Analytics. De rauwe ferzje fan dizze stream sil direkt nei de winkel skreaun wurde. De rûge gegevens opslach ynset yn DynamoDB sil djipper ticketanalyse mooglik meitsje fia BI-ark, lykas AWS Quick Sight.

Wy sille twa opsjes beskôgje foar it ynsetten fan 'e heule ynfrastruktuer:

  • Hânlieding - fia AWS Management Console;
  • Ynfrastruktuer út Terraform koade is foar lui automators;

Arsjitektuer fan it ûntwikkele systeem

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Gebrûkte komponinten:

  • Aviasales API - de gegevens weromjûn troch dizze API sille wurde brûkt foar alle folgjende wurk;
  • EC2 Producer Instance - in gewoane firtuele masine yn 'e wolk wêrop de ynfiergegevensstream sil wurde generearre:
    • Kinesis Agent is in Java-applikaasje ynstalleare lokaal op 'e masine dy't in maklike manier leveret om gegevens te sammeljen en te stjoeren nei Kinesis (Kinesis Data Streams of Kinesis Firehose). De agint kontrolearret konstant in set bestannen yn 'e oantsjutte mappen en stjoert nije gegevens nei Kinesis;
    • API Caller Script - In Python-skript dat oanfragen oan 'e API makket en it antwurd pleatst yn in map dy't wurdt kontrolearre troch de Kinesis Agent;
  • Kinesis Data Streams - realtime datastreamingtsjinst mei brede skaalmooglikheden;
  • Kinesis Analytics is in serverless tsjinst dy't simplifies de analyze fan streaming gegevens yn real time. Amazon Kinesis Data Analytics konfigurearret applikaasje-boarnen en skalen automatysk om elke folume fan ynkommende gegevens te behanneljen;
  • AWS Lambda - in tsjinst wêrmei jo koade kinne útfiere sûnder in reservekopy te meitsjen of servers yn te stellen. Alle kompjûterkrêft wurdt automatysk skale foar elke oprop;
  • Amazon DynamoDB - In databank fan kaai-wearde-pearen en dokuminten dy't wachttiid leveret fan minder dan 10 millisekonden by it rinnen op elke skaal. By it brûken fan DynamoDB hoege jo gjin servers te leverjen, te patchjen of te behearjen. DynamoDB skale automatysk tabellen om it bedrach fan beskikbere boarnen oan te passen en hege prestaasjes te behâlden. Gjin systeem administraasje is nedich;
  • Amazon SNS - in folslein beheare tsjinst foar it ferstjoeren fan berjochten mei it model fan 'e publisher-abonnee (Pub / Sub) wêrmei jo mikrotsjinsten, ferspraat systemen en serverless applikaasjes kinne isolearje. SNS kin brûkt wurde om ynformaasje te stjoeren nei ein brûkers fia mobile push-notifikaasjes, SMS-berjochten en e-mails.

Initial training

Om de gegevensstream te emulearjen, besleat ik de ynformaasje oer loftlinekaarten te brûken weromjûn troch de Aviasales API. YN dokumintaasje nochal in wiidweidige list fan ferskillende metoaden, lit ús nimme ien fan harren - "Moanlikse Priis Calendar", dy't jout prizen foar elke dei fan 'e moanne, groepearre troch it oantal oerstappen. As jo ​​de sykmoanne net oantsjutte yn it fersyk, sil ynformaasje weromjûn wurde foar de moanne nei de aktuele.

Dat, lit ús registrearje en ús token krije.

In foarbyldfersyk is hjirûnder:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

De boppesteande metoade foar it ûntfangen fan gegevens fan 'e API troch it opjaan fan in token yn it fersyk sil wurkje, mar ik leaver it tagongstoken troch de koptekst troch te jaan, dus wy sille dizze metoade brûke yn it api_caller.py-skript.

Foarbyld fan antwurd:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

It foarbyld API-antwurd hjirboppe toant in kaartsje fan Sint Petersburch nei Phuk ... Oh, wat in dream ...
Sûnt ik bin út Kazan, en Phuket is no "allinnich in dream", lit ús sykje kaartsjes út Sint Petersburch nei Kazan.

It giet derfan út dat jo al in AWS-akkount hawwe. Ik wol fuortendaliks spesjaal omtinken jaan oan it feit dat Kinesis en it ferstjoeren fan notifikaasjes fia SMS net opnommen binne yn 'e jierlikse Free Tier (fergees gebrûk). Mar sels nettsjinsteande dit, mei in pear dollar yn gedachten, is it hiel mooglik om it foarstelde systeem te bouwen en dermei te boartsje. En ferjit fansels net alle boarnen te wiskjen nei't se net mear nedich binne.

Gelokkich sille DynamoDb- en lambda-funksjes fergees wêze foar ús as wy foldogge oan ús moanlikse frije grinzen. Bygelyks foar DynamoDB: 25 GB opslach, 25 WCU / RCU en 100 miljoen queries. En in miljoen lambdafunksje-oproppen per moanne.

Hânlieding systeem ynset

Kinesis Data Streams ynstelle

Litte wy nei de Kinesis Data Streams-tsjinst gean en twa nije streamen meitsje, ien shard foar elk.

Wat is in skerpe?
In shard is de basisgegevensferfier-ienheid fan in Amazon Kinesis-stream. Ien segmint soarget foar oerdracht fan ynfiergegevens mei in snelheid fan 1 MB / s en útfiergegevensferfier mei in snelheid fan 2 MB / s. Ien segmint stipet maksimaal 1000 PUT-yngongen per sekonde. By it meitsjen fan in gegevensstream moatte jo it fereaske oantal segminten opjaan. Jo kinne bygelyks in gegevensstream meitsje mei twa segminten. Dizze gegevensstream sil ynfiergegevensferfier leverje mei 2 MB / s en útfiergegevensferfier op 4 MB / s, en stypje oant 2000 PUT-records per sekonde.

Hoe mear stikken yn jo stream, hoe grutter de trochslach is. Yn prinsipe is dit hoe't streamen wurde skalearre - troch it tafoegjen fan shards. Mar hoe mear shards jo hawwe, hoe heger de priis. Elke shard kostet 1,5 sinten per oere en in ekstra 1.4 sinten foar elke miljoen PUT-ladingsienheden.

Litte wy in nije stream meitsje mei de namme airline_tickets, 1 skerp sil him genôch wêze:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Litte wy no in oare thread meitsje mei de namme spesjale_stream:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

Produsint opset

Om in taak te analysearjen, is it genôch om in gewoane EC2-eksimplaar te brûken as gegevensprodusint. It hoecht net in krêftige, djoere firtuele masine te wêzen; in plak t2.micro sil it goed dwaan.

Wichtige opmerking: jo moatte bygelyks ôfbylding brûke - Amazon Linux AMI 2018.03.0, it hat minder ynstellingen foar it fluch starten fan de Kinesis Agent.

Gean nei de EC2-tsjinst, meitsje in nije firtuele masine, selektearje de winske AMI mei type t2.micro, dy't opnommen is yn 'e Free Tier:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Om de nij oanmakke firtuele masine te kinnen ynteraksje mei de Kinesis-tsjinst, moat it rjochten krije om dat te dwaan. De bêste manier om dit te dwaan is om in IAM-rol te jaan. Dêrom moatte jo op it skerm Stap 3: Ynstellingsdetails ynstelle, selektearje Meitsje nije IAM Rol:

It meitsjen fan in IAM-rol foar EC2
Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Selektearje yn it finster dat iepent dat wy in nije rol meitsje foar EC2 en gean nei de seksje Permissions:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Mei it brûken fan it trainingsfoarbyld hoege wy net yn te gean yn alle yngewikkeldheden fan korrelige konfiguraasje fan boarnerjochten, dus wy sille it belied selektearje dat foarôf konfigureare is troch Amazon: AmazonKinesisFullAccess en CloudWatchFullAccess.

Lit ús in betsjuttingsfolle namme jaan foar dizze rol, bygelyks: EC2-KinesisStreams-FullAccess. It resultaat moat itselde wêze as werjûn yn 'e foto hjirûnder:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Nei it oanmeitsjen fan dizze nije rol, ferjit it net te heakjen oan it oanmakke firtuele masine-eksimplaar:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Wy feroarje neat oars op dit skerm en gean troch nei de folgjende finsters.

De hurde skiifynstellingen kinne as standert oerlitten wurde, lykas de tags (hoewol't it in goede praktyk is om tags te brûken, jou it eksimplaar teminsten in namme en jouwe de omjouwing oan).

No binne wy ​​​​op 'e stap 6: ljepblêd Feiligensgroep konfigurearje, wêr't jo in nije moatte oanmeitsje of jo besteande Feiligensgroep opjaan, wêrtroch jo kinne ferbine fia ssh (poarte 22) mei it eksimplaar. Selektearje Boarne -> Myn IP dêr en jo kinne it eksimplaar starte.

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Sadree't it oerstapt nei rinnende status, kinne jo besykje te ferbinen mei it fia ssh.

Om mei Kinesis Agent te wurkjen, moatte jo nei suksesfolle ferbining mei de masine de folgjende kommando's ynfiere yn 'e terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Litte wy in map meitsje om API-antwurden op te slaan:

sudo mkdir /var/log/airline_tickets

Foardat jo de agent begjinne, moatte jo de konfiguraasje konfigurearje:

sudo vim /etc/aws-kinesis/agent.json

De ynhâld fan it agent.json-bestân moat der sa útsjen:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

As kin sjoen wurde út it konfiguraasjetriem, sil de agint bestannen kontrolearje mei de .log-útwreiding yn 'e /var/log/airline_tickets/-map, se parse en oerdrage nei de airline_tickets-stream.

Wy starte de tsjinst opnij en soargje derfoar dat it op en rint:

sudo service aws-kinesis-agent restart

Litte wy no it Python-skript downloade dat gegevens sil freegje fan 'e API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

It skript api_caller.py freget gegevens fan Aviasales en bewarret it ûntfongen antwurd yn 'e map dy't de Kinesis-agint scant. De ymplemintaasje fan dit skript is frij standert, der is in TicketsApi klasse, it kinne jo asynchronously lûke de API. Wy passe in koptekst mei in token en freegje parameters oan dizze klasse:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Om de juste ynstellingen en funksjonaliteit fan 'e agint te testen, litte wy it skript api_caller.py testje:

sudo ./api_caller.py TOKEN

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
En wy sjogge nei it resultaat fan it wurk yn 'e Agent-logs en op it ljepblêd Monitoring yn' e gegevensstream fan airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Sa't jo sjen kinne, wurket alles en de Kinesis Agent stjoert mei súkses gegevens nei de stream. Litte wy no de konsumint konfigurearje.

Kinesis Data Analytics ynstelle

Litte wy trochgean nei de sintrale komponint fan it heule systeem - meitsje in nije applikaasje yn Kinesis Data Analytics mei de namme kinesis_analytics_airlines_app:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Kinesis Data Analytics kinne jo realtime gegevensanalytyk útfiere fan Kinesis Streams mei de SQL-taal. It is in folslein autoskalearjende tsjinst (oars as Kinesis Streams) dat:

  1. kinne jo meitsje nije streamen (Output Stream) basearre op fersiken om boarne gegevens;
  2. jout in stream mei flaters dy't barde wylst applikaasjes rinne (Flater Stream);
  3. kin automatysk bepale de ynfier gegevens skema (it kin wurde manuell op 'e nij definiearre as it nedich is).

Dit is gjin goedkeape tsjinst - 0.11 USD per oere wurk, dus jo moatte it foarsichtich brûke en wiskje as jo klear binne.

Litte wy de applikaasje ferbine mei de gegevensboarne:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Selektearje de stream wêrmei wy ferbine sille (airline_tickets):

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Dêrnei moatte jo in nije IAM-rol taheakje, sadat de applikaasje kin lêze fan 'e stream en skriuwe nei de stream. Om dit te dwaan is it genôch om neat te feroarjen yn it blok tagongsrjochten:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Litte wy no ûntdekking oanfreegje fan it gegevensskema yn 'e stream; Klikje om dit te dwaan op de knop "Skema ûntdekke". As gefolch sil de IAM-rol bywurke wurde (in nije sil oanmakke wurde) en skemadeteksje sil wurde lansearre fanút de gegevens dy't al yn 'e stream binne oankommen:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
No moatte jo nei de SQL-bewurker gean. As jo ​​op dizze knop klikke, sil in finster ferskine dat jo freget de applikaasje te starten - selektearje wat jo wolle starte:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Foegje de folgjende ienfâldige query yn it SQL-bewurkerfinster yn en klikje op Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Yn relasjonele databases wurkje jo mei tabellen mei INSERT-útspraken om records ta te foegjen en in SELECT-útspraak om gegevens te freegjen. Yn Amazon Kinesis Data Analytics wurkje jo mei streamen (STREAM's) en pompen (PUMP's) - trochgeande ynfoegje oanfragen dy't gegevens fan ien stream yn in applikaasje ynfoegje yn in oare stream.

De hjirboppe presintearre SQL-query siket nei Aeroflot-kaarten foar in kosten fan minder dan fiif tûzen roebel. Alle records dy't foldogge oan dizze betingsten sille wurde pleatst yn 'e DESTINATION_SQL_STREAM-stream.

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Selektearje yn it bestimmingsblok de special_stream stream, en yn 'e yn-applikaasje streamnamme DESTINATION_SQL_STREAM drop-down list:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
It resultaat fan alle manipulaasjes moat wat ferlykber wêze mei de ôfbylding hjirûnder:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

In SNS-ûnderwerp oanmeitsje en ynskriuwe

Gean nei de Simple Notification Service en meitsje dêr in nij ûnderwerp mei de namme Airlines:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Abonnearje op dit ûnderwerp en jou it mobile tillefoannûmer oan wêrnei't SMS-notifikaasjes sille wurde ferstjoerd:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

Meitsje in tabel yn DynamoDB

Om de rauwe gegevens fan har airline_tickets-stream op te slaan, litte wy in tabel meitsje yn DynamoDB mei deselde namme. Wy sille record_id brûke as de primêre kaai:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

It meitsjen fan in lambda funksje samler

Litte wy in lambda-funksje oanmeitsje mei de namme Collector, waans taak sil wêze om de stream fan airline_tickets te ûndersiikjen en, as der nije records wurde fûn, dizze records ynfoegje yn 'e DynamoDB-tabel. Fansels moat dizze lambda, neist de standertrjochten, lêstagong hawwe ta de Kinesis-gegevensstream en skriuwtagong ta DynamoDB.

It meitsjen fan in IAM-rol foar de samler lambda-funksje
Litte wy earst in nije IAM-rol meitsje foar de lambda mei de namme Lambda-TicketsProcessingRole:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Foar it testfoarbyld binne it foarôf ynstelde AmazonKinesisReadOnlyAccess- en AmazonDynamoDBFullAccess-belied frij geskikt, lykas werjûn yn 'e ôfbylding hjirûnder:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

Dizze lambda moat wurde lansearre troch in trigger fan Kinesis as nije yngongen de airline_stream ynfiere, dus moatte wy in nije trigger tafoegje:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Alles wat oerbliuwt is de koade yn te foegjen en de lambda op te slaan.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

It meitsjen fan in notifier foar lambdafunksje

De twadde lambda-funksje, dy't de twadde stream (special_stream) kontrolearret en in notifikaasje nei SNS stjoert, wurdt op in fergelykbere manier makke. Dêrom moat dizze lambda tagong hawwe om te lêzen fan Kinesis en berjochten te stjoeren nei in opjûne SNS-ûnderwerp, dy't dan troch de SNS-tsjinst stjoerd wurde nei alle abonnees fan dit ûnderwerp (e-post, SMS, ensfh.).

It meitsjen fan in IAM-rol
Earst meitsje wy de IAM-rol Lambda-KinesisAlarm foar dizze lambda, en jouwe dizze rol dan ta oan de alarm_notifier lambda dy't wurdt makke:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

Dizze lambda moat wurkje oan in trigger foar nije records om de special_stream yn te gean, dus jo moatte de trigger op deselde manier konfigurearje as wy diene foar de Collector lambda.

Om it makliker te meitsjen om dizze lambda te konfigurearjen, litte wy in nije omjouwingsfariabele yntrodusearje - TOPIC_ARN, wêr't wy de ANR (Amazon Recourse Names) fan it Airlines-ûnderwerp pleatse:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
En foegje de lambda-koade yn, it is hielendal net yngewikkeld:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

It liket derop dat dit is wêr't de hânmjittige systeemkonfiguraasje is foltôge. Alles wat oerbliuwt is te testen en te soargjen dat wy alles goed hawwe ynsteld.

Ynsette fan Terraform koade

Ferplichte tarieding

Terraform is in heul handich iepen boarne-ark foar it ynsetten fan ynfrastruktuer fan koade. It hat in eigen syntaksis dy't maklik te learen is en hat in protte foarbylden fan hoe en wat te ynsetten. De Atom-bewurker as Visual Studio Code hat in protte handige plugins dy't it wurkjen mei Terraform makliker meitsje.

Jo kinne download de distribúsje fan hjir. In detaillearre analyze fan alle Terraform-mooglikheden is bûten it berik fan dit artikel, dus wy sille ús beheine ta de haadpunten.

Hoe begjinne

De folsleine koade fan it projekt is yn myn repository. Wy klonearje it repository foar ússels. Foardat jo begjinne, moatte jo derfoar soargje dat jo AWS CLI ynstalleare en konfigureare hawwe, om't ... Terraform sil sykje nei bewiisbrieven yn it ~/.aws/credentials-bestân.

In goede praktyk is om it plankommando út te fieren foardat de heule ynfrastruktuer ynset wurdt om te sjen wat Terraform op it stuit foar ús yn 'e wolk makket:

terraform.exe plan

Jo wurde frege om in telefoannûmer yn te fieren om notifikaasjes nei te stjoeren. It is net nedich om it yn dit stadium yn te fieren.

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Nei it analysearjen fan it operaasjeplan fan it programma, kinne wy ​​​​begjinne mei it meitsjen fan boarnen:

terraform.exe apply

Nei it ferstjoeren fan dit kommando wurde jo opnij frege om in telefoannûmer yn te fieren; skilje "ja" as in fraach oer it feitlik útfieren fan de aksjes wurdt werjûn. Hjirmei kinne jo de heule ynfrastruktuer ynstelle, alle nedige konfiguraasje fan EC2 útfiere, lambda-funksjes ynsette, ensfh.

Nei't alle boarnen mei sukses makke binne troch de Terraform-koade, moatte jo yn 'e details fan' e Kinesis Analytics-applikaasje gean (spitigernôch haw ik net fûn hoe't jo dit direkt fan 'e koade kinne dwaan).

Starte de applikaasje:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Hjirnei moatte jo de namme fan 'e stream yn' e applikaasje eksplisyt ynstelle troch te selektearjen út 'e dellûklist:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
No is alles klear om te gean.

Testje de applikaasje

Nettsjinsteande hoe't jo it systeem ynset hawwe, mei de hân of fia Terraform-koade, sil it itselde wurkje.

Wy ynlogge fia SSH nei de EC2 firtuele masine dêr't Kinesis Agent is ynstallearre en rinne it api_caller.py skript

sudo ./api_caller.py TOKEN

Alles wat jo hoege te dwaan is wachtsje op in SMS nei jo nûmer:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
SMS - in berjocht komt yn hast 1 minút op 'e telefoan:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld
It bliuwt om te sjen oft de records waarden bewarre yn 'e DynamoDB-database foar folgjende, mear detaillearre analyse. De tabel foar airline_tickets befettet sawat de folgjende gegevens:

Aviasales API yntegraasje mei Amazon Kinesis en serverless ienfâld

konklúzje

Yn 'e rin fan it dien wurk waard in online gegevensferwurkingssysteem boud basearre op Amazon Kinesis. Opsjes foar it brûken fan de Kinesis Agent yn gearhing mei Kinesis Data Streams en real-time analytics Kinesis Analytics mei SQL-kommando's, lykas de ynteraksje fan Amazon Kinesis mei oare AWS-tsjinsten waarden beskôge.

Wy hawwe it boppesteande systeem op twa manieren ynset: in frij lange hânlieding en in flugge fan 'e Terraform-koade.

Alle projekt boarne koade is beskikber yn myn GitHub-repository, Ik stel foar dat jo jo dermei fertroud meitsje.

Ik bin bliid om it artikel te besprekken, ik sjoch út nei jo opmerkings. Ik hoopje op konstruktive krityk.

Ik winskje jo súkses!

Boarne: www.habr.com

Add a comment