Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Hei Habr!

Ydych chi'n hoffi hedfan awyrennau? Rwyf wrth fy modd, ond yn ystod hunan-ynysu fe wnes i hefyd syrthio mewn cariad â dadansoddi data ar docynnau awyr o un adnodd adnabyddus - Aviasales.

Heddiw, byddwn yn dadansoddi gwaith Amazon Kinesis, yn adeiladu system ffrydio gyda dadansoddeg amser real, yn gosod cronfa ddata Amazon DynamoDB NoSQL fel y prif storfa ddata, ac yn sefydlu hysbysiadau SMS ar gyfer tocynnau diddorol.

Mae'r holl fanylion o dan y toriad! Ewch!

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Cyflwyniad

Er enghraifft, mae angen mynediad i ni Aviasales API. Darperir mynediad iddo yn rhad ac am ddim a heb gyfyngiadau; y cyfan sydd angen i chi ei wneud yw cofrestru yn yr adran “Datblygwyr” i dderbyn eich tocyn API i gael mynediad at y data.

Prif bwrpas yr erthygl hon yw rhoi dealltwriaeth gyffredinol o'r defnydd o ffrydio gwybodaeth yn AWS; rydym yn cymryd i ystyriaeth nad yw'r data a ddychwelwyd gan yr API a ddefnyddir yn gwbl gyfredol a'i fod yn cael ei drosglwyddo o'r storfa, sef a ffurfiwyd yn seiliedig ar chwiliadau gan ddefnyddwyr y gwefannau Aviasales.ru a Jetradar.com am y 48 awr ddiwethaf.

Bydd asiant Kinesis, wedi'i osod ar y peiriant cynhyrchu, a dderbynnir trwy'r API yn dosrannu'n awtomatig ac yn trosglwyddo data i'r ffrwd a ddymunir trwy Kinesis Data Analytics. Bydd fersiwn amrwd y ffrwd hon yn cael ei ysgrifennu'n uniongyrchol i'r siop. Bydd y storfa data crai a ddefnyddir yn DynamoDB yn caniatáu dadansoddiad dyfnach o docynnau trwy offer BI, megis Golwg Cyflym AWS.

Byddwn yn ystyried dau opsiwn ar gyfer defnyddio’r seilwaith cyfan:

  • Llawlyfr - trwy Consol Rheoli AWS;
  • Mae seilwaith o god Terraform ar gyfer awtomeiddwyr diog;

Pensaernïaeth y system ddatblygedig

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Cydrannau a ddefnyddir:

  • Aviasales API — bydd y data a ddychwelir gan yr API hwn yn cael ei ddefnyddio ar gyfer yr holl waith dilynol;
  • Enghraifft Cynhyrchydd EC2 — peiriant rhithwir rheolaidd yn y cwmwl lle bydd y llif data mewnbwn yn cael ei gynhyrchu:
    • Asiant Kinesis yn gymhwysiad Java sydd wedi'i osod yn lleol ar y peiriant sy'n darparu ffordd hawdd o gasglu ac anfon data i Kinesis (Kinesis Data Streams neu Kinesis Firehose). Mae'r asiant yn monitro set o ffeiliau yn gyson yn y cyfeiriaduron penodedig ac yn anfon data newydd i Kinesis;
    • Sgript Galwr API — Sgript Python sy'n gwneud ceisiadau i'r API ac yn rhoi'r ymateb mewn ffolder sy'n cael ei fonitro gan yr Asiant Kinesis;
  • Ffrydiau Data Kinesis — gwasanaeth ffrydio data amser real gyda galluoedd graddio eang;
  • Dadansoddeg Kinesis yn wasanaeth di-weinydd sy'n symleiddio'r dadansoddiad o ddata ffrydio mewn amser real. Mae Amazon Kinesis Data Analytics yn ffurfweddu adnoddau cymhwysiad ac yn graddio'n awtomatig i drin unrhyw gyfaint o ddata sy'n dod i mewn;
  • AWS Lambda — gwasanaeth sy'n eich galluogi i redeg cod heb wneud copi wrth gefn na sefydlu gweinyddwyr. Mae'r holl bŵer cyfrifiadurol yn cael ei raddio'n awtomatig ar gyfer pob galwad;
  • DynamoDB Amazon - Cronfa ddata o barau a dogfennau gwerth allweddol sy'n darparu hwyrni o lai na 10 milieiliad wrth redeg ar unrhyw raddfa. Wrth ddefnyddio DynamoDB, nid oes angen i chi ddarparu, clytio na rheoli unrhyw weinyddion. Mae DynamoDB yn graddio tablau yn awtomatig i addasu faint o adnoddau sydd ar gael a chynnal perfformiad uchel. Nid oes angen gweinyddiaeth system;
  • Amazon SNS - gwasanaeth a reolir yn llawn ar gyfer anfon negeseuon gan ddefnyddio'r model cyhoeddwr-tanysgrifiwr (Tafarn/Is), y gallwch ynysu microwasanaethau, systemau dosbarthedig a chymwysiadau di-weinydd ag ef. Gellir defnyddio SNS i anfon gwybodaeth at ddefnyddwyr terfynol trwy hysbysiadau gwthio symudol, negeseuon SMS ac e-byst.

Hyfforddiant cychwynnol

Er mwyn efelychu'r llif data, penderfynais ddefnyddio'r wybodaeth tocyn cwmni hedfan a ddychwelwyd gan API Aviasales. YN dogfennaeth rhestr eithaf helaeth o wahanol ddulliau, gadewch i ni gymryd un ohonynt - "Calendr Prisiau Misol", sy'n dychwelyd prisiau ar gyfer pob diwrnod o'r mis, wedi'u grwpio yn ôl nifer y trosglwyddiadau. Os na fyddwch yn nodi'r mis chwilio yn y cais, bydd gwybodaeth yn cael ei dychwelyd ar gyfer y mis sy'n dilyn yr un cyfredol.

Felly, gadewch i ni gofrestru a chael ein tocyn.

Mae cais enghreifftiol isod:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Bydd y dull uchod o dderbyn data o'r API trwy nodi tocyn yn y cais yn gweithio, ond mae'n well gennyf basio'r tocyn mynediad trwy'r pennawd, felly byddwn yn defnyddio'r dull hwn yn y sgript api_caller.py.

Enghraifft ateb:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Mae'r ymateb API enghreifftiol uchod yn dangos tocyn o St. Petersburg i Phuk... O, am freuddwyd...
Gan fy mod yn dod o Kazan, a Phuket bellach yn “breuddwyd yn unig”, gadewch i ni edrych am docynnau o St Petersburg i Kazan.

Mae'n cymryd yn ganiataol bod gennych chi gyfrif AWS eisoes. Hoffwn dynnu sylw arbennig ar unwaith at y ffaith nad yw Kinesis ac anfon hysbysiadau trwy SMS wedi'u cynnwys yn y cyfarfod blynyddol. Haen Rhad ac Am Ddim (defnydd am ddim). Ond hyd yn oed er gwaethaf hyn, gyda chwpl o ddoleri mewn golwg, mae'n eithaf posibl adeiladu'r system arfaethedig a chwarae ag ef. Ac, wrth gwrs, peidiwch ag anghofio dileu'r holl adnoddau ar ôl nad oes eu hangen mwyach.

Yn ffodus, bydd swyddogaethau DynamoDb a lambda yn rhad ac am ddim i ni os ydym yn cwrdd â'n terfynau rhad ac am ddim misol. Er enghraifft, ar gyfer DynamoDB: 25 GB o storfa, 25 WCU / RCU a 100 miliwn o ymholiadau. Ac mae miliwn o alwadau swyddogaeth lambda y mis.

Defnydd system â llaw

Sefydlu Ffrydiau Data Kinesis

Gadewch i ni fynd i wasanaeth Ffrydiau Data Kinesis a chreu dwy ffrwd newydd, un darn i bob un.

Beth yw shard?
Shard yw uned trosglwyddo data sylfaenol ffrwd Kinesis Amazon. Mae un segment yn darparu trosglwyddiad data mewnbwn ar gyflymder o 1 MB/s a throsglwyddiad data allbwn ar gyflymder o 2 MB/s. Mae un segment yn cefnogi hyd at 1000 o gofnodion PUT yr eiliad. Wrth greu llif data, mae angen i chi nodi'r nifer gofynnol o segmentau. Er enghraifft, gallwch greu llif data gyda dwy segment. Bydd y ffrwd ddata hon yn darparu trosglwyddiad data mewnbwn ar 2 MB/s a throsglwyddiad data allbwn ar 4 MB/s, gan gefnogi hyd at 2000 o gofnodion PUT yr eiliad.

Po fwyaf o ddarnau yn eich nant, y mwyaf yw ei drwybwn. Mewn egwyddor, dyma sut mae llifoedd yn cael eu graddio - trwy ychwanegu darnau. Ond po fwyaf o ddarnau sydd gennych chi, yr uchaf yw'r pris. Mae pob darn yn costio 1,5 cents yr awr ac 1.4 cents ychwanegol am bob miliwn o unedau llwyth tâl PUT.

Gadewch i ni greu ffrwd newydd gyda'r enw tocynnau_cwmni hedfan, Bydd 1 shard yn ddigon iddo :

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Nawr, gadewch i ni greu edefyn arall gyda'r enw ffrwd_arbennig:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Gosodiad cynhyrchydd

I ddadansoddi tasg, mae'n ddigon defnyddio enghraifft EC2 rheolaidd fel cynhyrchydd data. Nid oes rhaid iddo fod yn beiriant rhithwir pwerus, drud; bydd spot t2.micro yn gwneud yn iawn.

Nodyn pwysig: er enghraifft, dylech ddefnyddio delwedd - Amazon Linux AMI 2018.03.0, mae ganddo lai o leoliadau ar gyfer lansio'r Asiant Kinesis yn gyflym.

Ewch i'r gwasanaeth EC2, crëwch beiriant rhithwir newydd, dewiswch yr AMI a ddymunir gyda math t2.micro, sydd wedi'i gynnwys yn yr Haen Rhad ac Am Ddim:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Er mwyn i'r peiriant rhithwir sydd newydd ei greu allu rhyngweithio â gwasanaeth Kinesis, rhaid rhoi hawliau iddo wneud hynny. Y ffordd orau o wneud hyn yw neilltuo Rôl IAM. Felly, ar y sgrin Cam 3: Ffurfweddu Manylion Instance, dylech ddewis Creu Rôl IAM newydd:

Creu rôl IAM ar gyfer EC2
Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Yn y ffenestr sy'n agor, dewiswch ein bod yn creu rôl newydd ar gyfer EC2 ac ewch i'r adran Caniatâd:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Gan ddefnyddio'r enghraifft hyfforddi, nid oes yn rhaid i ni fynd i mewn i holl gymhlethdodau cyfluniad gronynnog hawliau adnoddau, felly byddwn yn dewis y polisïau sydd wedi'u ffurfweddu ymlaen llaw gan Amazon: AmazonKinesisFullAccess a CloudWatchFullAccess.

Gadewch i ni roi enw ystyrlon i'r rôl hon, er enghraifft: EC2-KinesisStreams-FullAccess. Dylai'r canlyniad fod yr un peth â'r hyn a ddangosir yn y llun isod:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Ar ôl creu'r rôl newydd hon, peidiwch ag anghofio ei hatodi i'r enghraifft peiriant rhithwir a grëwyd:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Nid ydym yn newid unrhyw beth arall ar y sgrin hon ac yn symud ymlaen i'r ffenestri nesaf.

Gellir gadael gosodiadau'r gyriant caled fel rhagosodiad, yn ogystal â'r tagiau (er ei bod yn arfer da defnyddio tagiau, o leiaf rhowch enw i'r enghraifft a nodwch yr amgylchedd).

Nawr rydyn ni ar y tab Cam 6: Ffurfweddu Grŵp Diogelwch, lle mae angen i chi greu un newydd neu nodi'ch grŵp Diogelwch presennol, sy'n eich galluogi i gysylltu trwy ssh (porthladd 22) â'r enghraifft. Dewiswch Ffynhonnell -> Fy IP yno a gallwch chi lansio'r enghraifft.

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Cyn gynted ag y bydd yn newid i statws rhedeg, gallwch geisio cysylltu ag ef trwy ssh.

Er mwyn gallu gweithio gydag Asiant Kinesis, ar ôl cysylltu'n llwyddiannus â'r peiriant, rhaid i chi nodi'r gorchmynion canlynol yn y derfynell:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Gadewch i ni greu ffolder i arbed ymatebion API:

sudo mkdir /var/log/airline_tickets

Cyn cychwyn yr asiant, mae angen i chi ffurfweddu ei gyfluniad:

sudo vim /etc/aws-kinesis/agent.json

Dylai cynnwys y ffeil agent.json edrych fel hyn:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Fel y gwelir o'r ffeil ffurfweddu, bydd yr asiant yn monitro ffeiliau gyda'r estyniad .log yn y cyfeiriadur /var/log/airline_tickets/, eu dosrannu a'u trosglwyddo i'r ffrwd airline_tickets.

Rydym yn ailgychwyn y gwasanaeth ac yn gwneud yn siŵr ei fod ar waith:

sudo service aws-kinesis-agent restart

Nawr, gadewch i ni lawrlwytho'r sgript Python a fydd yn gofyn am ddata o'r API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Mae'r sgript api_caller.py yn gofyn am ddata gan Aviasales ac yn arbed yr ymateb a dderbyniwyd yn y cyfeiriadur y mae'r asiant Kinesis yn ei sganio. Mae gweithredu'r sgript hon yn eithaf safonol, mae dosbarth TicketsApi, mae'n caniatáu ichi dynnu'r API yn asyncronig. Rydyn ni'n pasio pennawd gyda thocyn ac yn gofyn am baramedrau i'r dosbarth hwn:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

I brofi gosodiadau ac ymarferoldeb cywir yr asiant, gadewch i ni roi prawf ar redeg y sgript api_caller.py:

sudo ./api_caller.py TOKEN

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Ac rydym yn edrych ar ganlyniad y gwaith yn y logiau Asiant ac ar y tab Monitro yn y ffrwd data airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Fel y gallwch weld, mae popeth yn gweithio ac mae'r Asiant Kinesis yn anfon data i'r ffrwd yn llwyddiannus. Nawr, gadewch i ni ffurfweddu defnyddiwr.

Sefydlu Kinesis Data Analytics

Gadewch i ni symud ymlaen i gydran ganolog y system gyfan - creu cymhwysiad newydd yn Kinesis Data Analytics o'r enw kinesis_analytics_airlines_app:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Mae Kinesis Data Analytics yn caniatáu ichi berfformio dadansoddeg data amser real o Kinesis Streams gan ddefnyddio'r iaith SQL. Mae'n wasanaeth graddio awtomatig (yn wahanol i Kinesis Streams) sy'n:

  1. yn eich galluogi i greu ffrydiau newydd (Allbwn Stream) yn seiliedig ar geisiadau i ddod o hyd i ddata;
  2. yn darparu ffrwd gyda gwallau a ddigwyddodd tra bod ceisiadau yn rhedeg (Ffrwd Gwallau);
  3. yn gallu pennu'r cynllun data mewnbwn yn awtomatig (gellir ei ailddiffinio â llaw os oes angen).

Nid yw hwn yn wasanaeth rhad - 0.11 USD yr awr o waith, felly dylech ei ddefnyddio'n ofalus a'i ddileu pan fyddwch wedi gorffen.

Gadewch i ni gysylltu'r cais â'r ffynhonnell ddata:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Dewiswch y ffrwd rydyn ni'n mynd i gysylltu ag ef (airline_tickets):

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Nesaf, mae angen i chi atodi Rôl IAM newydd fel y gall y cais ddarllen o'r ffrwd ac ysgrifennu at y nant. I wneud hyn, mae'n ddigon peidio â newid unrhyw beth yn y bloc caniatâd Mynediad:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Nawr, gadewch i ni ofyn am ddarganfod y sgema data yn y ffrwd; i wneud hyn, cliciwch ar y botwm “Darganfod sgema”. O ganlyniad, bydd rôl IAM yn cael ei diweddaru (bydd un newydd yn cael ei chreu) a bydd canfod sgema yn cael ei lansio o'r data sydd eisoes wedi cyrraedd y ffrwd:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Nawr mae angen i chi fynd at y golygydd SQL. Pan fyddwch chi'n clicio ar y botwm hwn, bydd ffenestr yn ymddangos yn gofyn ichi lansio'r rhaglen - dewiswch yr hyn rydych chi am ei lansio:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Mewnosodwch yr ymholiad syml canlynol yn ffenestr golygydd SQL a chliciwch ar Cadw a Rhedeg SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Mewn cronfeydd data perthynol, rydych yn gweithio gyda thablau gan ddefnyddio datganiadau INSERT i ychwanegu cofnodion a datganiad SELECT i ymholi am ddata. Yn Amazon Kinesis Data Analytics, rydych chi'n gweithio gyda ffrydiau (STREAMs) a phympiau (PUMPs) - ceisiadau mewnosod parhaus sy'n mewnosod data o un ffrwd mewn cymhwysiad i ffrwd arall.

Mae'r ymholiad SQL a gyflwynir uchod yn chwilio am docynnau Aeroflot am gost o lai na phum mil o rubles. Bydd yr holl gofnodion sy'n cwrdd â'r amodau hyn yn cael eu gosod yn y ffrwd DESTINATION_SQL_STREAM.

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Yn y bloc Cyrchfan, dewiswch y ffrwd_special_stream, ac yn y gwymplen enw ffrwd Mewn-gymhwysiad DESTINATION_SQL_STREAM:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Dylai canlyniad pob triniaeth fod yn rhywbeth tebyg i'r llun isod:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Creu a thanysgrifio i bwnc SNS

Ewch i'r Gwasanaeth Hysbysu Syml a chreu pwnc newydd yno gyda'r enw Airlines:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Tanysgrifiwch i'r pwnc hwn a nodwch y rhif ffôn symudol yr anfonir hysbysiadau SMS ato:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Creu tabl yn DynamoDB

I storio'r data crai o'u ffrwd airline_tickets, gadewch i ni greu tabl yn DynamoDB gyda'r un enw. Byddwn yn defnyddio record_id fel y brif allwedd:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Creu casglwr swyddogaethau lambda

Gadewch i ni greu swyddogaeth lambda o'r enw Collector, a'i dasg fydd pleidleisio'r ffrwd airline_tickets ac, os canfyddir cofnodion newydd yno, rhowch y cofnodion hyn yn y tabl DynamoDB. Yn amlwg, yn ychwanegol at yr hawliau diofyn, mae'n rhaid i'r lambda hwn gael mynediad darllen i'r ffrwd ddata Kinesis ac ysgrifennu mynediad i DynamoDB.

Creu rôl IAM ar gyfer swyddogaeth lambda casglwr
Yn gyntaf, gadewch i ni greu rôl IAM newydd ar gyfer y lambda o'r enw Lambda-TicketsProcessingRole:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Ar gyfer yr enghraifft brawf, mae'r polisïau AmazonKinesisReadOnlyAccess ac AmazonDynamoDBFullAccess wedi'u ffurfweddu ymlaen llaw yn eithaf addas, fel y dangosir yn y llun isod:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Dylai'r lambda hwn gael ei lansio gan sbardun o Kinesis pan fydd cofnodion newydd yn mynd i mewn i'r airline_stream, felly mae angen i ni ychwanegu sbardun newydd:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Y cyfan sydd ar ôl yw mewnosod y cod ac achub y lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creu hysbyswedd swyddogaeth lambda

Mae'r ail swyddogaeth lambda, a fydd yn monitro'r ail ffrwd (special_stream) ac yn anfon hysbysiad i SNS, yn cael ei greu mewn ffordd debyg. Felly, rhaid i'r lambda hwn gael mynediad i ddarllen o Kinesis ac anfon negeseuon at bwnc SNS penodol, a fydd wedyn yn cael ei anfon gan y gwasanaeth SNS at holl danysgrifwyr y pwnc hwn (e-bost, SMS, ac ati).

Creu rôl IAM
Yn gyntaf, rydym yn creu rôl IAM Lambda-KinesisAlarm ar gyfer y lambda hwn, ac yna'n aseinio'r rôl hon i'r lambda alarm_notifier sy'n cael ei greu:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Dylai'r lambda hwn weithio ar sbardun i gofnodion newydd fynd i mewn i'r special_stream, felly mae angen i chi ffurfweddu'r sbardun yn yr un modd ag y gwnaethom ar gyfer y Lambda Casglwr.

Er mwyn ei gwneud hi'n haws ffurfweddu'r lambda hwn, gadewch i ni gyflwyno newidyn amgylchedd newydd - TOPIC_ARN, lle rydyn ni'n gosod yr ANR (Enwau Ail-wneud Amazon) y pwnc Airlines:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
A mewnosodwch y cod lambda, nid yw'n gymhleth o gwbl:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Mae'n ymddangos mai dyma lle mae cyfluniad y system â llaw wedi'i gwblhau. Y cyfan sydd ar ôl yw profi a gwneud yn siŵr ein bod wedi ffurfweddu popeth yn gywir.

Defnyddio o god Terraform

Paratoi angenrheidiol

Terraform yn offeryn ffynhonnell agored cyfleus iawn ar gyfer defnyddio seilwaith o god. Mae ganddi ei chystrawen ei hun sy'n hawdd ei dysgu ac mae ganddi lawer o enghreifftiau o sut a beth i'w ddefnyddio. Mae gan olygydd Atom neu Visual Studio Code lawer o ategion defnyddiol sy'n ei gwneud hi'n haws gweithio gyda Terraform.

Gallwch chi lawrlwytho'r dosbarthiad felly. Mae dadansoddiad manwl o holl alluoedd Terraform y tu hwnt i gwmpas yr erthygl hon, felly byddwn yn cyfyngu ein hunain i'r prif bwyntiau.

Sut i redeg

Cod llawn y prosiect yw yn fy ystorfa. Rydym yn clonio'r ystorfa i ni ein hunain. Cyn dechrau, mae angen i chi sicrhau bod gennych AWS CLI wedi'i osod a'i ffurfweddu, oherwydd ... Bydd Terraform yn chwilio am gymwysterau yn y ffeil ~/.aws/credentials.

Arfer da yw rhedeg gorchymyn y cynllun cyn defnyddio'r seilwaith cyfan i weld beth mae Terraform yn ei greu i ni yn y cwmwl ar hyn o bryd:

terraform.exe plan

Fe'ch anogir i nodi rhif ffôn i anfon hysbysiadau ato. Nid oes angen mynd i mewn iddo ar hyn o bryd.

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Ar ôl dadansoddi cynllun gweithredu'r rhaglen, gallwn ddechrau creu adnoddau:

terraform.exe apply

Ar ôl anfon y gorchymyn hwn, gofynnir i chi eto nodi rhif ffôn; deialu "ie" pan fydd cwestiwn am gyflawni'r camau gweithredu yn cael ei arddangos. Bydd hyn yn caniatáu ichi sefydlu'r seilwaith cyfan, cyflawni'r holl gyfluniad angenrheidiol o EC2, defnyddio swyddogaethau lambda, ac ati.

Ar ôl i'r holl adnoddau gael eu creu'n llwyddiannus trwy'r cod Terraform, mae angen i chi fynd i mewn i fanylion y cais Kinesis Analytics (yn anffodus, ni wnes i ddarganfod sut i wneud hyn yn uniongyrchol o'r cod).

Lansio'r cais:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Ar ôl hyn, rhaid i chi osod enw'r ffrwd mewn cais yn benodol trwy ddewis o'r gwymplen:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Nawr mae popeth yn barod i fynd.

Profi'r cais

Waeth sut y gwnaethoch chi ddefnyddio'r system, â llaw neu trwy god Terraform, bydd yn gweithio yr un peth.

Rydym yn mewngofnodi trwy SSH i'r peiriant rhithwir EC2 lle mae Asiant Kinesis wedi'i osod ac yn rhedeg y sgript api_caller.py

sudo ./api_caller.py TOKEN

Y cyfan sy'n rhaid i chi ei wneud yw aros am SMS i'ch rhif:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
SMS - mae neges yn cyrraedd eich ffôn mewn bron i 1 munud:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd
Rhaid aros i weld a gafodd y cofnodion eu cadw yng nghronfa ddata DynamoDB ar gyfer dadansoddiad manylach wedi hynny. Mae'r tabl tocynnau_cwmni hedfan yn cynnwys tua'r data canlynol:

Integreiddiad API Aviasales ag Amazon Kinesis a symlrwydd di-weinydd

Casgliad

Yn ystod y gwaith a wnaed, adeiladwyd system prosesu data ar-lein yn seiliedig ar Amazon Kinesis. Ystyriwyd opsiynau ar gyfer defnyddio'r Asiant Kinesis ar y cyd â Ffrydiau Data Kinesis a dadansoddiadau amser real Kinesis Analytics gan ddefnyddio gorchmynion SQL, yn ogystal â rhyngweithio Amazon Kinesis â gwasanaethau AWS eraill.

Fe wnaethom ddefnyddio'r system uchod mewn dwy ffordd: un â llaw eithaf hir ac un cyflym o'r cod Terraform.

Mae holl god ffynhonnell y prosiect ar gael yn fy ystorfa GitHub, Rwy'n awgrymu ichi ymgyfarwyddo ag ef.

Rwy'n hapus i drafod yr erthygl, edrychaf ymlaen at eich sylwadau. Rwy'n gobeithio am feirniadaeth adeiladol.

Rwy'n dymuno llwyddiant i chi!

Ffynhonnell: hab.com

Ychwanegu sylw