Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

Hey Habr!

Ṣe o fẹran awọn ọkọ ofurufu ti n fo? Mo nifẹ rẹ, ṣugbọn lakoko ipinya ara ẹni Mo tun ṣubu ni ifẹ pẹlu itupalẹ data lori awọn tikẹti afẹfẹ lati orisun kan ti a mọ daradara - Aviasales.

Loni a yoo ṣe itupalẹ iṣẹ Amazon Kinesis, kọ eto ṣiṣanwọle pẹlu awọn atupale akoko gidi, fi sori ẹrọ Amazon DynamoDB NoSQL database bi ibi ipamọ data akọkọ, ati ṣeto awọn iwifunni SMS fun awọn tikẹti ti o nifẹ.

Gbogbo awọn alaye wa labẹ gige! Lọ!

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

Ifihan

Fun apẹẹrẹ, a nilo wiwọle si Aviasales API. Wiwọle si rẹ ti pese laisi idiyele ati laisi awọn ihamọ; o kan nilo lati forukọsilẹ ni apakan “Awọn Difelopa” lati gba ami-ami API rẹ lati wọle si data naa.

Idi akọkọ ti nkan yii ni lati fun ni oye gbogbogbo ti lilo ṣiṣanwọle alaye ni AWS; a ṣe akiyesi pe data ti API ti o pada ko ṣe imudojuiwọn to muna ati pe o ti gbejade lati kaṣe, eyiti o jẹ ti o da lori awọn wiwa nipasẹ awọn olumulo ti awọn aaye Aviasales.ru ati Jetradar.com fun awọn wakati 48 to kọja.

Aṣoju Kinesis, ti a fi sori ẹrọ lori ẹrọ iṣelọpọ, ti a gba nipasẹ API yoo ṣe itupalẹ laifọwọyi ati gbejade data si ṣiṣan ti o fẹ nipasẹ Awọn atupale data Kinesis. Ẹya aise ti ṣiṣan yii yoo kọ taara si ile itaja. Ibi ipamọ data aise ti a gbe lọ si DynamoDB yoo gba laaye fun itupalẹ tikẹti jinlẹ nipasẹ awọn irinṣẹ BI, gẹgẹbi AWS Quick Sight.

A yoo gbero awọn aṣayan meji fun gbigbe gbogbo awọn amayederun:

  • Afowoyi - nipasẹ AWS Management Console;
  • Awọn amayederun lati koodu Terraform jẹ fun awọn adaṣe ọlẹ;

Faaji ti eto idagbasoke

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Awọn eroja ti a lo:

  • Aviasales API — data ti API yi pada yoo ṣee lo fun gbogbo iṣẹ ti o tẹle;
  • EC2 o nse Apeere - ẹrọ foju deede ninu awọsanma lori eyiti ṣiṣan data titẹ sii yoo jẹ ipilẹṣẹ:
    • Aṣoju Kinesis jẹ ohun elo Java ti a fi sori ẹrọ ni agbegbe lori ẹrọ ti o pese ọna ti o rọrun lati gba ati firanṣẹ data si Kinesis (Awọn ṣiṣan Data Kinesis tabi Kinesis Firehose). Aṣoju nigbagbogbo n ṣe abojuto eto awọn faili ni awọn ilana ti a ti sọ tẹlẹ ati firanṣẹ data tuntun si Kinesis;
    • API Olupe akosile - Iwe afọwọkọ Python ti o ṣe awọn ibeere si API ati fi idahun sinu folda ti o jẹ abojuto nipasẹ Aṣoju Kinesis;
  • Kinesis Data ṣiṣan - iṣẹ ṣiṣanwọle data ni akoko gidi pẹlu awọn agbara iwọn iwọn;
  • Awọn atupale Kinesis jẹ iṣẹ ti ko ni olupin ti o rọrun fun itupalẹ ti data ṣiṣan ni akoko gidi. Awọn atupale data Amazon Kinesis tunto awọn ohun elo ohun elo ati awọn iwọn laifọwọyi lati mu iwọn eyikeyi ti data ti nwọle;
  • AWS Lambda - iṣẹ kan ti o fun ọ laaye lati ṣiṣẹ koodu laisi atilẹyin tabi ṣeto awọn olupin. Gbogbo agbara iširo ti wa ni iwọn laifọwọyi fun ipe kọọkan;
  • Amazon DynamoDB - Ipamọ data ti awọn orisii iye bọtini ati awọn iwe aṣẹ ti o pese lairi ti o kere ju milliseconds 10 nigbati o nṣiṣẹ ni iwọn eyikeyi. Nigba lilo DynamoDB, o ko nilo lati pese, patch, tabi ṣakoso eyikeyi olupin. DynamoDB ṣe iwọn awọn tabili laifọwọyi lati ṣatunṣe iye awọn orisun to wa ati ṣetọju iṣẹ giga. Ko si iṣakoso eto ti a beere;
  • Amazon SNS - iṣẹ iṣakoso ni kikun fun fifiranṣẹ awọn ifiranṣẹ nipa lilo awoṣe akede-alabapin (Pub/Sub), pẹlu eyiti o le ya sọtọ awọn microservices, awọn eto pinpin ati awọn ohun elo olupin. SNS le ṣee lo lati firanṣẹ alaye si awọn olumulo ipari nipasẹ awọn iwifunni titari alagbeka, awọn ifiranṣẹ SMS ati awọn imeeli.

Ikẹkọ akọkọ

Lati ṣe apẹẹrẹ sisan data, Mo pinnu lati lo alaye tikẹti ọkọ ofurufu ti o pada nipasẹ Aviasales API. IN iwe Atokọ lọpọlọpọ ti awọn ọna oriṣiriṣi, jẹ ki a mu ọkan ninu wọn - “Kalẹnda Iye owo oṣooṣu”, eyiti o da awọn idiyele pada fun ọjọ kọọkan ti oṣu, ni akojọpọ nipasẹ nọmba awọn gbigbe. Ti o ko ba ṣe pato oṣu wiwa ninu ibeere naa, alaye yoo pada fun oṣu ti o tẹle eyi ti o wa lọwọlọwọ.

Nitorinaa, jẹ ki a forukọsilẹ ki o gba ami-ami wa.

Ibeere apẹẹrẹ wa ni isalẹ:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ọna ti o wa loke ti gbigba data lati API nipa sisọ ami kan ninu ibeere naa yoo ṣiṣẹ, ṣugbọn Mo fẹ lati kọja ami ami iwọle nipasẹ akọsori, nitorinaa a yoo lo ọna yii ni iwe api_caller.py.

Idahun apẹẹrẹ:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Idahun API ti o wa loke fihan tikẹti kan lati St. Petersburg si Phuk... Oh, kini ala...
Niwọn igba ti Mo wa lati Kazan, ati Phuket jẹ bayi "ala kan nikan", jẹ ki a wa awọn tikẹti lati St. Petersburg si Kazan.

O dawọle pe o ti ni akọọlẹ AWS tẹlẹ. Emi yoo fẹ lati fa ifojusi pataki lẹsẹkẹsẹ si otitọ pe Kinesis ati fifiranṣẹ awọn iwifunni nipasẹ SMS ko si ninu lododun. Ipele Ọfẹ (lilo ọfẹ). Sugbon ani pelu yi, pẹlu kan tọkọtaya ti dọla ni lokan, o jẹ ohun ṣee ṣe lati kọ awọn dabaa eto ati ki o mu awọn pẹlu ti o. Ati pe, nitorinaa, maṣe gbagbe lati pa gbogbo awọn orisun rẹ lẹhin ti wọn ko nilo wọn mọ.

O da, DynamoDb ati awọn iṣẹ lambda yoo jẹ ọfẹ fun wa ti a ba pade awọn opin ọfẹ oṣooṣu wa. Fun apẹẹrẹ, fun DynamoDB: 25 GB ti ibi ipamọ, 25 WCU/RCU ati awọn ibeere 100 milionu. Ati awọn ipe iṣẹ lambda milionu kan fun oṣu kan.

Afowoyi eto imuṣiṣẹ

Ṣiṣeto Awọn ṣiṣan Data Kinesis

Jẹ ki a lọ si iṣẹ Kinesis Data Streams ati ṣẹda awọn ṣiṣan tuntun meji, shard kan fun ọkọọkan.

Kini shard kan?
Shard jẹ apakan gbigbe data ipilẹ ti ṣiṣan Kinesis Amazon kan. Apa kan n pese gbigbe data titẹ sii ni iyara 1 MB/s ati gbigbe data jade ni iyara 2 MB/s. Apa kan ṣe atilẹyin to awọn titẹ sii PUT 1000 fun iṣẹju kan. Nigbati o ba ṣẹda ṣiṣan data, o nilo lati pato nọmba ti a beere fun awọn abala. Fun apẹẹrẹ, o le ṣẹda ṣiṣan data pẹlu awọn ipele meji. Ṣiṣan data yii yoo pese gbigbe data titẹ sii ni 2 MB / s ati gbigbe data jade ni 4 MB / s, n ṣe atilẹyin awọn igbasilẹ 2000 PUT fun iṣẹju-aaya.

Awọn shards diẹ sii ninu ṣiṣan rẹ, ti iṣelọpọ rẹ pọ si. Ni opo, eyi ni bi awọn ṣiṣan ti ṣe iwọn - nipa fifi awọn shards kun. Ṣugbọn diẹ sii awọn shards ti o ni, idiyele ti o ga julọ. Owo shard kọọkan jẹ 1,5 senti fun wakati kan ati afikun 1.4 senti fun gbogbo awọn ẹya isanwo PUT miliọnu.

Jẹ ki a ṣẹda ṣiṣan tuntun pẹlu orukọ ofurufu_tiketi, 1 gogo XNUMX yoo to fun u:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Bayi jẹ ki a ṣẹda okun miiran pẹlu orukọ special_stream:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

Olupilẹṣẹ iṣeto

Lati ṣe itupalẹ iṣẹ-ṣiṣe kan, o to lati lo apẹẹrẹ EC2 deede bi olupilẹṣẹ data. Ko ni lati jẹ ẹrọ ti o lagbara, gbowolori; iranran t2.micro yoo ṣe daradara.

Akọsilẹ pataki: fun apẹẹrẹ, o yẹ ki o lo aworan - Amazon Linux AMI 2018.03.0, o ni awọn eto diẹ fun ifilọlẹ Aṣoju Kinesis ni kiakia.

Lọ si iṣẹ EC2, ṣẹda ẹrọ foju tuntun, yan AMI ti o fẹ pẹlu iru t2.micro, eyiti o wa ninu Ipele Ọfẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Ni ibere fun ẹrọ foju ti a ṣẹda tuntun lati ni anfani lati ṣe ajọṣepọ pẹlu iṣẹ Kinesis, o gbọdọ fun ni awọn ẹtọ lati ṣe bẹ. Ọna ti o dara julọ lati ṣe eyi ni lati fi ipa IAM kan sọtọ. Nitorinaa, lori Igbesẹ 3: Tunto iboju Awọn alaye Apeere, o yẹ ki o yan Ṣẹda ipa IAM tuntun:

Ṣiṣẹda ipa IAM fun EC2
Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Ninu ferese ti o ṣii, yan pe a n ṣẹda ipa tuntun fun EC2 ki o lọ si apakan Awọn igbanilaaye:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Lilo apẹẹrẹ ikẹkọ, a ko ni lati lọ sinu gbogbo awọn intricacies ti iṣeto granular ti awọn ẹtọ awọn oluşewadi, nitorinaa a yoo yan awọn eto imulo ti a ti tunto tẹlẹ nipasẹ Amazon: AmazonKinesisFullAccess ati CloudWatchFullAccess.

Jẹ ki a fun orukọ ti o nilari fun ipa yii, fun apẹẹrẹ: EC2-KinesisStreams-FullAccess. Abajade yẹ ki o jẹ kanna bi o ti han ninu aworan ni isalẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Lẹhin ṣiṣẹda ipa tuntun yii, maṣe gbagbe lati so pọ si apẹẹrẹ ẹrọ foju ti o ṣẹda:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
A ko yipada ohunkohun miiran lori iboju yii ki o lọ si awọn window atẹle.

Awọn eto dirafu lile le wa ni osi bi aiyipada, bakanna bi awọn afi (botilẹjẹpe o jẹ adaṣe to dara lati lo awọn afi, o kere ju fun apẹẹrẹ ni orukọ ati tọkasi agbegbe).

Bayi a wa lori Igbesẹ 6: Tunto taabu Ẹgbẹ Aabo, nibiti o nilo lati ṣẹda tuntun kan tabi pato ẹgbẹ Aabo ti o wa tẹlẹ, eyiti o fun ọ laaye lati sopọ nipasẹ ssh (ibudo 22) si apẹẹrẹ. Yan Orisun -> IP mi nibẹ ati pe o le ṣe ifilọlẹ apẹẹrẹ naa.

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Ni kete ti o yipada si ipo ṣiṣiṣẹ, o le gbiyanju lati sopọ si rẹ nipasẹ ssh.

Lati ni anfani lati ṣiṣẹ pẹlu Aṣoju Kinesis, lẹhin asopọ ni aṣeyọri si ẹrọ, o gbọdọ tẹ awọn aṣẹ wọnyi sii ni ebute naa:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Jẹ ki a ṣẹda folda kan lati fi awọn idahun API pamọ:

sudo mkdir /var/log/airline_tickets

Ṣaaju ki o to bẹrẹ aṣoju, o nilo lati tunto atunto rẹ:

sudo vim /etc/aws-kinesis/agent.json

Awọn akoonu ti faili agent.json yẹ ki o dabi eyi:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Gẹgẹbi a ti le rii lati faili iṣeto ni, aṣoju yoo ṣe atẹle awọn faili pẹlu itẹsiwaju .log ni / var/log/airline_tickets/ directory, ṣagbe wọn ki o gbe wọn lọ si ṣiṣan ọkọ ofurufu_tiketi.

A tun bẹrẹ iṣẹ naa ki o rii daju pe o wa ni oke ati nṣiṣẹ:

sudo service aws-kinesis-agent restart

Bayi jẹ ki a ṣe igbasilẹ iwe afọwọkọ Python ti yoo beere data lati API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Iwe afọwọkọ api_caller.py n beere data lati ọdọ Aviasales ati fi idahun ti o gba pamọ sinu itọsọna ti aṣoju Kinesis ṣe ayẹwo. Imuse ti iwe afọwọkọ yii jẹ boṣewa to, kilasi TicketsApi wa, o fun ọ laaye lati fa API asynchronously. A kọja akọsori kan pẹlu ami ami kan ati beere awọn paramita si kilasi yii:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Lati ṣe idanwo awọn eto to pe ati iṣẹ ṣiṣe ti aṣoju, jẹ ki a ṣe idanwo ṣiṣe api_caller.py script:

sudo ./api_caller.py TOKEN

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Ati pe a wo abajade iṣẹ naa ninu awọn akọọlẹ Aṣoju ati lori taabu Abojuto ni ṣiṣan data airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Bii o ti le rii, ohun gbogbo n ṣiṣẹ ati Aṣoju Kinesis ni ifijišẹ firanṣẹ data si ṣiṣan naa. Bayi jẹ ki a tunto olumulo.

Ṣiṣeto Awọn Itupalẹ Data Kinesis

Jẹ ki a lọ siwaju si paati aringbungbun ti gbogbo eto - ṣẹda ohun elo tuntun ni Kinesis Data Analytics ti a npè ni kinesis_analytics_airlines_app:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Awọn atupale data Kinesis gba ọ laaye lati ṣe awọn atupale data akoko gidi lati Awọn ṣiṣan Kinesis nipa lilo ede SQL. O jẹ iṣẹ adaṣe adaṣe ni kikun (ko dabi Kinesis Streams) pe:

  1. gba ọ laaye lati ṣẹda awọn ṣiṣan tuntun (Olujade Ijade) da lori awọn ibeere si data orisun;
  2. pese ṣiṣan pẹlu awọn aṣiṣe ti o waye lakoko ti awọn ohun elo nṣiṣẹ (Ṣiṣe aṣiṣe);
  3. le ṣe ipinnu ero data igbewọle laifọwọyi (o le ṣe atuntu pẹlu ọwọ ti o ba jẹ dandan).

Eyi kii ṣe iṣẹ olowo poku - 0.11 USD fun wakati iṣẹ kan, nitorinaa o yẹ ki o lo ni pẹkipẹki ki o paarẹ nigbati o ba pari.

Jẹ ki a so ohun elo pọ si orisun data:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Yan ṣiṣan ti a yoo sopọ si (airline_tickets):

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Nigbamii, o nilo lati so ipa IAM tuntun kan ki ohun elo naa le ka lati inu ṣiṣan naa ki o kọ si ṣiṣan naa. Lati ṣe eyi, o to lati ma ṣe yi ohunkohun pada ninu dina awọn igbanilaaye Wiwọle:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Bayi jẹ ki a beere wiwa ti ero data ninu ṣiṣan; lati ṣe eyi, tẹ bọtini “Ṣawari ero”. Bi abajade, ipa IAM yoo ni imudojuiwọn (titun yoo ṣẹda) ati wiwa ero yoo ṣe ifilọlẹ lati inu data ti o ti de tẹlẹ ninu ṣiṣan:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Bayi o nilo lati lọ si olootu SQL. Nigbati o ba tẹ bọtini yii, window kan yoo han pe ki o ṣe ifilọlẹ ohun elo naa - yan ohun ti o fẹ ṣe ifilọlẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Fi ibeere ti o rọrun wọnyi sinu window olootu SQL ki o tẹ Fipamọ ati Ṣiṣe SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Ninu awọn data data ibatan, o ṣiṣẹ pẹlu awọn tabili ni lilo awọn alaye INSERT lati ṣafikun awọn igbasilẹ ati alaye YAN kan si data ibeere. Ni Awọn Itupalẹ Data Kinesis Amazon, o ṣiṣẹ pẹlu awọn ṣiṣan (STREAMs) ati awọn ifasoke (PUMPs) - awọn ibeere fi sii tẹsiwaju ti o fi data sii lati inu ṣiṣan kan ninu ohun elo sinu ṣiṣan omiran.

Ibeere SQL ti a gbekalẹ loke awọn wiwa fun awọn tikẹti Aeroflot ni idiyele ti o wa ni isalẹ ẹgbẹrun marun rubles. Gbogbo awọn igbasilẹ ti o baamu awọn ipo wọnyi ni ao gbe sinu ṣiṣan DESTINATION_SQL_STREAM.

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Ninu Idina Ilọsiwaju, yan ṣiṣan pataki_stream, ati ninu orukọ ṣiṣan inu ohun elo DESTINATION_SQL_STREAM akojọ-isalẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Abajade gbogbo awọn ifọwọyi yẹ ki o jẹ nkan ti o jọra si aworan ni isalẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

Ṣiṣẹda ati ṣiṣe alabapin si koko SNS kan

Lọ si Iṣẹ Iwifunni Rọrun ki o ṣẹda akọle tuntun nibẹ pẹlu orukọ Awọn ọkọ ofurufu:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Alabapin si koko yii ki o tọka nọmba foonu alagbeka si eyiti awọn iwifunni SMS yoo fi ranṣẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

Ṣẹda tabili ni DynamoDB

Lati tọju data aise lati ṣiṣan ọkọ ofurufu_tikẹti wọn, jẹ ki a ṣẹda tabili ni DynamoDB pẹlu orukọ kanna. A yoo lo record_id bi bọtini akọkọ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

Ṣiṣẹda alakojo iṣẹ lambda

Jẹ ki a ṣẹda iṣẹ lambda kan ti a pe ni Alakojọpọ, ti iṣẹ-ṣiṣe rẹ yoo jẹ lati ṣe idibo ṣiṣan ọkọ ofurufu_tikẹti ati, ti o ba rii awọn igbasilẹ tuntun nibẹ, fi awọn igbasilẹ wọnyi sinu tabili DynamoDB. O han ni, ni afikun si awọn ẹtọ aiyipada, lambda yii gbọdọ ni iraye si kika si ṣiṣan data Kinesis ati kọ iraye si DynamoDB.

Ṣiṣẹda ipa IAM kan fun iṣẹ lambda-odè
Ni akọkọ, jẹ ki a ṣẹda ipa IAM tuntun fun lambda ti a npè ni Lambda-TicketsProcessingRole:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Fun apẹẹrẹ idanwo, AmazonKinesisReadOnlyAccess ti a ti ṣatunto tẹlẹ ati awọn eto imulo wiwọle AmazonDynamoDBFull jẹ ohun ti o dara, bi o ṣe han ninu aworan ni isalẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

Lambda yii yẹ ki o ṣe ifilọlẹ nipasẹ okunfa kan lati Kinesis nigbati awọn titẹ sii tuntun wọ inu ọkọ ofurufu_stream, nitorinaa a nilo lati ṣafikun okunfa tuntun kan:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Gbogbo ohun ti o ku ni lati fi koodu sii ati fi lambda pamọ.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Ṣiṣẹda iwifunni iṣẹ lambda

Iṣẹ lambda keji, eyiti yoo ṣe atẹle ṣiṣan keji (special_stream) ati firanṣẹ iwifunni si SNS, ni a ṣẹda ni ọna kanna. Nitorinaa, lambda yii gbọdọ ni iwọle lati ka lati Kinesis ati firanṣẹ awọn ifiranṣẹ si koko-ọrọ SNS ti a fun, eyiti yoo firanṣẹ nipasẹ iṣẹ SNS si gbogbo awọn alabapin ti koko yii (imeeli, SMS, ati bẹbẹ lọ).

Ṣiṣẹda ipa IAM kan
Ni akọkọ, a ṣẹda ipa IAM Lambda-KinesisAlarm fun lambda yii, ati lẹhinna fi ipa yii si alarm_notifier lambda ti a ṣẹda:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

Lambda yii yẹ ki o ṣiṣẹ lori okunfa fun awọn igbasilẹ titun lati tẹ special_stream, nitorina o nilo lati tunto okunfa naa ni ọna kanna bi a ti ṣe fun Lambda Alakojo.

Lati jẹ ki o rọrun lati tunto lambda yii, jẹ ki a ṣafihan oniyipada agbegbe tuntun - TOPIC_ARN, nibiti a ti gbe ANR (Awọn orukọ Recourse Recourse) ti koko ọkọ ofurufu:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Ati fi koodu lambda sii, kii ṣe idiju rara:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

O dabi pe eyi ni ibi ti iṣeto eto afọwọṣe ti pari. Gbogbo ohun ti o ku ni lati ṣe idanwo ati rii daju pe a ti tunto ohun gbogbo ni deede.

Rans lati Terraform koodu

Ti a beere igbaradi

Ilana ipilẹ jẹ ohun elo ṣiṣi-orisun ti o rọrun pupọ fun sisọ awọn amayederun lati koodu. O ni sintasi tirẹ ti o rọrun lati kọ ẹkọ ati pe o ni ọpọlọpọ awọn apẹẹrẹ ti bii ati kini lati ran lọ. Olootu Atom tabi koodu Studio Visual ni ọpọlọpọ awọn afikun ọwọ ti o jẹ ki ṣiṣẹ pẹlu Terraform rọrun.

O le ṣe igbasilẹ pinpin lati ibi. Itupalẹ alaye ti gbogbo awọn agbara Terraform kọja ipari ti nkan yii, nitorinaa a yoo fi opin si ara wa si awọn aaye akọkọ.

Bawo ni lati bẹrẹ

Awọn ni kikun koodu ti ise agbese ni ninu mi ibi ipamọ. A ṣe ẹda ibi ipamọ si ara wa. Ṣaaju ki o to bẹrẹ, o nilo lati rii daju pe o ni AWS CLI ti fi sori ẹrọ ati tunto, nitori ... Terraform yoo wa awọn iwe-ẹri ninu faili ~/.aws/awọn iwe-ẹri.

Iwa ti o dara ni lati ṣiṣẹ aṣẹ ero ṣaaju gbigbe gbogbo awọn amayederun lati rii kini Terraform n ṣẹda lọwọlọwọ fun wa ninu awọsanma:

terraform.exe plan

O yoo ti ọ lati tẹ nọmba foonu kan lati fi awọn iwifunni si. Ko ṣe pataki lati tẹ sii ni ipele yii.

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Lẹhin ti ṣe itupalẹ ero iṣẹ ṣiṣe ti eto, a le bẹrẹ ṣiṣẹda awọn orisun:

terraform.exe apply

Lẹhin fifiranṣẹ aṣẹ yii, iwọ yoo tun beere lọwọ rẹ lati tẹ nọmba foonu kan sii; tẹ “bẹẹni” nigbati ibeere kan nipa ṣiṣe awọn iṣe naa han. Eyi yoo gba ọ laaye lati ṣeto gbogbo awọn amayederun, ṣe gbogbo iṣeto pataki ti EC2, mu awọn iṣẹ lambda ṣiṣẹ, ati bẹbẹ lọ.

Lẹhin ti gbogbo awọn orisun ti ṣẹda ni aṣeyọri nipasẹ koodu Terraform, o nilo lati lọ sinu awọn alaye ti ohun elo Awọn atupale Kinesis (laanu, Emi ko rii bii o ṣe le ṣe taara lati koodu naa).

Lọlẹ ohun elo:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Lẹhin eyi, o gbọdọ ṣeto ni gbangba orukọ ṣiṣan ohun elo nipasẹ yiyan lati atokọ jabọ-silẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
Bayi ohun gbogbo ti šetan lati lọ.

Idanwo ohun elo

Laibikita bawo ni o ṣe gbe eto naa, pẹlu ọwọ tabi nipasẹ koodu Terraform, yoo ṣiṣẹ kanna.

A wọle nipasẹ SSH si ẹrọ foju EC2 nibiti Aṣoju Kinesis ti fi sii ati ṣiṣe iwe api_caller.py

sudo ./api_caller.py TOKEN

Gbogbo ohun ti o ni lati ṣe ni duro fun SMS si nọmba rẹ:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
SMS - ifiranṣẹ naa de lori foonu ni o fẹrẹ to iṣẹju kan:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin
O wa lati rii boya awọn igbasilẹ ti wa ni fipamọ ni aaye data DynamoDB fun atẹle, itupalẹ alaye diẹ sii. Tabili airline_tiketi ni isunmọ data wọnyi:

Iṣọkan Aviasales API pẹlu Amazon Kinesis ati ayedero olupin

ipari

Ninu iṣẹ ti a ṣe, eto ṣiṣe data lori ayelujara ni a kọ da lori Amazon Kinesis. Awọn aṣayan fun lilo Aṣoju Kinesis ni apapo pẹlu Kinesis Data Streams ati awọn atupale akoko gidi Kinesis atupale lilo awọn pipaṣẹ SQL, bakanna bi ibaraenisepo ti Amazon Kinesis pẹlu awọn iṣẹ AWS miiran ni a gbero.

A gbe eto ti o wa loke ni awọn ọna meji: afọwọṣe gigun kuku kan ati ọkan iyara lati koodu Terraform.

Gbogbo koodu orisun ise agbese wa ninu ibi ipamọ GitHub mi, Mo daba pe ki o mọ ara rẹ pẹlu rẹ.

Inu mi dun lati jiroro lori nkan naa, Mo nireti awọn asọye rẹ. Mo lero fun todara lodi.

Mo fẹ o aseyori!

orisun: www.habr.com

Fi ọrọìwòye kun