Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Habari Habr!

Je, unapenda ndege zinazoruka? Ninaipenda, lakini wakati wa kujitenga nilipenda pia kuchambua data kwenye tikiti za ndege kutoka kwa rasilimali moja inayojulikana - Aviasales.

Leo tutachambua kazi ya Amazon Kinesis, tutaunda mfumo wa utiririshaji na uchanganuzi wa wakati halisi, kusakinisha hifadhidata ya Amazon DynamoDB NoSQL kama hifadhi kuu ya data, na kusanidi arifa za SMS kwa tikiti zinazovutia.

Maelezo yote ni chini ya kukata! Nenda!

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Utangulizi

Kwa mfano, tunahitaji ufikiaji Aviasales API. Upatikanaji wake hutolewa bila malipo na bila vikwazo; unahitaji tu kujiandikisha katika sehemu ya "Watengenezaji" ili kupokea tokeni yako ya API ili kufikia data.

Kusudi kuu la kifungu hiki ni kutoa uelewa wa jumla wa utumiaji wa utiririshaji wa habari katika AWS; tunazingatia kwamba data iliyorejeshwa na API iliyotumiwa sio ya kisasa kabisa na hupitishwa kutoka kwa kashe, ambayo ni. iliyoundwa kulingana na utafutaji wa watumiaji wa tovuti za Aviasales.ru na Jetradar.com kwa saa 48 zilizopita.

Kinesis-ajenti, iliyosakinishwa kwenye mashine ya kuzalisha, iliyopokelewa kupitia API itachanganua kiotomatiki na kusambaza data kwa mtiririko unaotaka kupitia Kinesis Data Analytics. Toleo ghafi la mtiririko huu litaandikwa moja kwa moja kwenye duka. Hifadhi ghafi ya data iliyotumwa katika DynamoDB itaruhusu uchanganuzi wa kina wa tikiti kupitia zana za BI, kama vile AWS Quick Sight.

Tutazingatia chaguzi mbili za kupeleka miundombinu yote:

  • Mwongozo - kupitia AWS Management Console;
  • Miundombinu kutoka kwa nambari ya Terraform ni ya waendeshaji wavivu wa otomatiki;

Usanifu wa mfumo ulioendelezwa

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Vipengele vilivyotumika:

  • Aviasales API - data iliyorejeshwa na API hii itatumika kwa kazi zote zinazofuata;
  • Mfano wa Mtayarishaji wa EC2 - mashine ya kawaida kwenye wingu ambayo mtiririko wa data ya ingizo utatolewa:
    • Wakala wa Kinesi ni programu ya Java iliyosakinishwa ndani ya mashine ambayo hutoa njia rahisi ya kukusanya na kutuma data kwa Kinesis (Mikondo ya Data ya Kinesis au Kinesis Firehose). Wakala hufuatilia kila mara seti ya faili katika saraka maalum na kutuma data mpya kwa Kinesis;
    • Hati ya Kipigaji cha API - Hati ya Python ambayo hufanya maombi kwa API na kuweka majibu kwenye folda ambayo inafuatiliwa na Wakala wa Kinesis;
  • Mito ya data ya Kinesis - huduma ya utiririshaji wa data ya wakati halisi na uwezo mpana wa kuongeza;
  • Kinesis Analytics ni huduma isiyo na seva ambayo hurahisisha uchanganuzi wa data ya kutiririsha kwa wakati halisi. Amazon Kinesis Data Analytics husanidi rasilimali za programu na mizani kiotomatiki kushughulikia kiasi chochote cha data inayoingia;
  • AWS Lambda β€” huduma inayokuruhusu kuendesha msimbo bila kuhifadhi nakala au kusanidi seva. Nguvu zote za kompyuta hupimwa kiotomatiki kwa kila simu;
  • Amazon DynamoDB - Hifadhidata ya jozi za thamani-msingi na hati zinazotoa muda wa kusubiri wa chini ya milisekunde 10 unapoendeshwa kwa kiwango chochote. Unapotumia DynamoDB, huhitaji kutoa, kurekebisha, au kudhibiti seva zozote. DynamoDB hukuza jedwali kiotomatiki ili kurekebisha kiasi cha rasilimali zinazopatikana na kudumisha utendakazi wa juu. Hakuna utawala wa mfumo unaohitajika;
  • Amazon SNS - huduma inayodhibitiwa kikamilifu ya kutuma ujumbe kwa kutumia kielelezo cha mchapishaji-msajili (Pub/Sub), ambacho unaweza kutenga huduma ndogo, mifumo iliyosambazwa na programu zisizo na seva. SNS inaweza kutumika kutuma taarifa kwa watumiaji wa hatima kupitia arifa zinazotumwa na kifaa cha mkononi, SMS na barua pepe.

Mafunzo ya awali

Ili kuiga mtiririko wa data, niliamua kutumia maelezo ya tikiti ya ndege iliyorejeshwa na API ya Aviasales. KATIKA nyaraka orodha pana ya njia tofauti, wacha tuchukue moja yao - "Kalenda ya Bei ya Kila Mwezi", ambayo inarudisha bei kwa kila siku ya mwezi, iliyowekwa na idadi ya uhamishaji. Ikiwa hutabainisha mwezi wa utafutaji katika ombi, taarifa itarejeshwa kwa mwezi unaofuata wa sasa.

Kwa hiyo, hebu tujiandikishe na tupate ishara yetu.

Mfano wa ombi ni hapa chini:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Njia iliyo hapo juu ya kupokea data kutoka kwa API kwa kutaja ishara katika ombi itafanya kazi, lakini napendelea kupitisha ishara ya ufikiaji kupitia kichwa, kwa hivyo tutatumia njia hii kwenye hati ya api_caller.py.

Jibu mfano:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Mfano jibu la API hapo juu linaonyesha tikiti kutoka St. Petersburg hadi Phuk... Lo, ni ndoto gani...
Kwa kuwa ninatoka Kazan, na Phuket sasa ni "ndoto tu", hebu tutafute tiketi kutoka St. Petersburg hadi Kazan.

Inadhania kuwa tayari una akaunti ya AWS. Ningependa mara moja kuteka kipaumbele maalum kwa ukweli kwamba Kinesis na kutuma arifa kupitia SMS hazijumuishwa katika kila mwaka. Kiwango cha bure (matumizi ya bure). Lakini hata licha ya hili, kwa kuzingatia dola kadhaa, inawezekana kabisa kujenga mfumo uliopendekezwa na kucheza nao. Na, bila shaka, usisahau kufuta rasilimali zote baada ya kuwa hazihitajiki tena.

Kwa bahati nzuri, chaguo za kukokotoa za DynamoDb na lambda zitakuwa bila malipo kwetu ikiwa utatimiza vikomo vya bure vya kila mwezi. Kwa mfano, kwa DynamoDB: GB 25 ya hifadhi, 25 WCU/RCU na hoja milioni 100. Na kazi ya milioni ya lambda huita kwa mwezi.

Usambazaji wa mfumo wa mwongozo

Kuweka Mitiririko ya Data ya Kinesis

Hebu tuende kwenye huduma ya Mipasho ya Data ya Kinesis na tuunde mitiririko miwili mipya, shard moja kwa kila moja.

Jembe ni nini?
Shard ni kitengo cha msingi cha kuhamisha data cha mkondo wa Amazon Kinesis. Sehemu moja hutoa uhamisho wa data ya pembejeo kwa kasi ya 1 MB / s na uhamisho wa data ya pato kwa kasi ya 2 MB / s. Sehemu moja inaweza kutumia hadi maingizo 1000 ya PUT kwa sekunde. Wakati wa kuunda mkondo wa data, unahitaji kutaja idadi inayotakiwa ya sehemu. Kwa mfano, unaweza kuunda mkondo wa data na sehemu mbili. Mtiririko huu wa data utatoa uhamishaji wa data ya pembejeo kwa 2 MB/s na uhamishaji wa data ya pato kwa 4 MB/s, ikisaidia hadi rekodi 2000 za PUT kwa sekunde.

Kadiri shards zinavyoongezeka kwenye mkondo wako, ndivyo upitishaji wake unavyoongezeka. Kimsingi, hii ndio jinsi mtiririko hupimwa - kwa kuongeza shards. Lakini shards zaidi unayo, bei ya juu. Kila shard inagharimu senti 1,5 kwa saa na senti 1.4 ya ziada kwa kila milioni ya vitengo vya malipo ya PUT.

Hebu tuunde mtiririko mpya kwa jina tikiti_za_ndege, kipande 1 kitamtosha:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Sasa tuunde thread nyingine yenye jina maalum_mkondo:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Mpangilio wa mzalishaji

Ili kuchanganua kazi, inatosha kutumia mfano wa kawaida wa EC2 kama mtayarishaji wa data. Si lazima iwe mashine pepe yenye nguvu na ghali; doa t2.micro itafanya vyema.

Kumbuka muhimu: kwa mfano, unapaswa kutumia picha - Amazon Linux AMI 2018.03.0, ina mipangilio machache ya kuzindua haraka Wakala wa Kinesis.

Nenda kwenye huduma ya EC2, unda mashine mpya ya mtandaoni, chagua AMI inayotaka na aina t2.micro, ambayo imejumuishwa kwenye Kiwango cha Bure:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Ili mashine mpya iliyoundwa iweze kuingiliana na huduma ya Kinesis, ni lazima ipewe haki za kufanya hivyo. Njia bora ya kufanya hivyo ni kugawa Jukumu la IAM. Kwa hivyo, kwenye Hatua ya 3: Sanidi Maelezo ya Insta skrini, unapaswa kuchagua Unda Jukumu jipya la IAM:

Kuunda jukumu la IAM kwa EC2
Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Katika dirisha linalofungua, chagua kuwa tunaunda jukumu jipya la EC2 na uende kwenye sehemu ya Ruhusa:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Kwa kutumia mfano wa mafunzo, si lazima tuingie katika hitilafu zote za usanidi wa punjepunje wa haki za rasilimali, kwa hivyo tutachagua sera zilizowekwa mapema na Amazon: AmazonKinesisFullAccess na CloudWatchFullAccess.

Hebu tupe jina la maana kwa jukumu hili, kwa mfano: EC2-KinesisStreams-FullAccess. Matokeo yanapaswa kuwa sawa na inavyoonyeshwa kwenye picha hapa chini:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Baada ya kuunda jukumu hili jipya, usisahau kuiambatisha kwa mfano wa mashine iliyoundwa:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Hatubadilishi kitu kingine chochote kwenye skrini hii na kwenda kwenye madirisha yanayofuata.

Mipangilio ya diski kuu inaweza kuachwa kama chaguo-msingi, pamoja na vitambulisho (ingawa ni mazoezi mazuri kutumia vitambulisho, angalau toa mfano jina na uonyeshe mazingira).

Sasa tuko kwenye Hatua ya 6: Sanidi kichupo cha Kikundi cha Usalama, ambapo unahitaji kuunda mpya au kutaja kikundi chako cha Usalama kilichopo, ambacho hukuruhusu kuunganishwa kupitia ssh (bandari 22) kwa mfano. Chagua Chanzo -> IP yangu hapo na unaweza kuzindua mfano.

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Mara tu inapobadilika kuwa hali inayoendesha, unaweza kujaribu kuunganishwa nayo kupitia ssh.

Ili kuweza kufanya kazi na Wakala wa Kinesis, baada ya kuunganishwa kwa mafanikio kwenye mashine, lazima uweke amri zifuatazo kwenye terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Wacha tuunde folda ili kuhifadhi majibu ya API:

sudo mkdir /var/log/airline_tickets

Kabla ya kuanza wakala, unahitaji kusanidi usanidi wake:

sudo vim /etc/aws-kinesis/agent.json

Yaliyomo kwenye faili ya agent.json inapaswa kuonekana kama hii:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kama inavyoweza kuonekana kutoka kwa faili ya usanidi, wakala atafuatilia faili zilizo na kiendelezi cha .logi kwenye saraka /var/log/airline_tickets/, azichanganue na kuzihamisha hadi kwenye mtiririko wa tiketi za ndege.

Tunaanzisha tena huduma na hakikisha kuwa iko na inafanya kazi:

sudo service aws-kinesis-agent restart

Sasa wacha tupakue hati ya Python ambayo itaomba data kutoka kwa API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Hati ya api_caller.py huomba data kutoka kwa Aviasales na huhifadhi jibu lililopokelewa katika saraka ambayo wakala wa Kinesis huchanganua. Utekelezaji wa hati hii ni ya kawaida kabisa, kuna darasa la TicketsApi, hukuruhusu kuvuta API bila usawa. Tunapitisha kichwa na ishara na tunaomba vigezo kwa darasa hili:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Ili kujaribu mipangilio sahihi na utendakazi wa wakala, wacha tujaribu kutekeleza hati ya api_caller.py:

sudo ./api_caller.py TOKEN

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Na tunaangalia matokeo ya kazi katika kumbukumbu za Wakala na kwenye kichupo cha Ufuatiliaji katika mtiririko wa data wa tiketi za ndege:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Kama unavyoona, kila kitu hufanya kazi na Wakala wa Kinesis hutuma kwa ufanisi data kwenye mkondo. Sasa hebu tusanidi mtumiaji.

Kuanzisha Takwimu za Kinesis

Wacha tuendelee hadi sehemu kuu ya mfumo mzima - kuunda programu mpya katika Kinesis Data Analytics inayoitwa kinesis_analytics_airlines_app:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Uchanganuzi wa Data wa Kinesis hukuruhusu kufanya uchanganuzi wa data katika wakati halisi kutoka kwa Mipasho ya Kinesis kwa kutumia lugha ya SQL. Ni huduma ya kuongeza otomatiki kikamilifu (tofauti na Mito ya Kinesis) ambayo:

  1. hukuruhusu kuunda mitiririko mipya (Mkondo wa Pato) kulingana na maombi ya kupata data;
  2. hutoa mtiririko na hitilafu zilizotokea wakati programu zinaendeshwa (Mtiririko wa Hitilafu);
  3. inaweza kuamua kiotomatiki mpango wa data ya kuingiza (inaweza kufafanuliwa upya ikiwa ni lazima).

Hii sio huduma ya bei nafuu - 0.11 USD kwa saa ya kazi, kwa hiyo unapaswa kuitumia kwa uangalifu na kuifuta unapomaliza.

Wacha tuunganishe programu kwenye chanzo cha data:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Chagua mtiririko ambao tutaunganisha kwa (tiketi_za_ndege):

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Ifuatayo, unahitaji kuambatisha Jukumu jipya la IAM ili programu iweze kusoma kutoka kwa mtiririko na kuandika kwa mtiririko. Ili kufanya hivyo, inatosha kutobadilisha chochote kwenye kizuizi cha idhini ya Upataji:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Sasa hebu tuombe ugunduzi wa schema ya data kwenye mkondo; ili kufanya hivyo, bofya kitufe cha "Gundua schema". Kwa hivyo, jukumu la IAM litasasishwa (jipya litaundwa) na ugunduzi wa taratibu utazinduliwa kutoka kwa data ambayo tayari imefika kwenye mkondo:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Sasa unahitaji kwenda kwa mhariri wa SQL. Unapobofya kitufe hiki, dirisha litaonekana kukuuliza uanzishe programu - chagua unachotaka kuzindua:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Ingiza swali rahisi lifuatalo kwenye dirisha la mhariri wa SQL na ubofye Hifadhi na Uendeshe SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Katika hifadhidata za uhusiano, unafanya kazi na majedwali kwa kutumia taarifa za INSERT kuongeza rekodi na taarifa CHAGUA ili kuuliza data. Katika Amazon Kinesis Data Analytics, unafanya kazi na mitiririko (STREAMs) na pampu (PUMPs)β€”maombi ya mara kwa mara ya kuingiza ambayo huingiza data kutoka kwa mkondo mmoja katika programu hadi mkondo mwingine.

Hoja ya SQL iliyowasilishwa hapo juu hutafuta tikiti za Aeroflot kwa gharama iliyo chini ya rubles elfu tano. Rekodi zote zinazotimiza masharti haya zitawekwa katika mkondo wa DESTINATION_SQL_STREAM.

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Katika sehemu ya Lengwa, chagua mtiririko_ maalum, na katika orodha kunjuzi ya jina la mtiririko wa programu-tumizi DESTINATION_SQL_STREAM:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Matokeo ya udanganyifu wote yanapaswa kuwa kitu sawa na picha hapa chini:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Kuunda na kujiandikisha kwa mada ya SNS

Nenda kwa Huduma Rahisi ya Arifa na uunde mada mpya hapo kwa jina Airlines:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Jiandikishe kwa mada hii na uonyeshe nambari ya simu ya rununu ambayo arifa za SMS zitatumwa:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Unda jedwali katika DynamoDB

Ili kuhifadhi data ghafi kutoka kwa mtiririko wao wa tiketi za ndege, hebu tuunde jedwali katika DynamoDB kwa jina sawa. Tutatumia record_id kama ufunguo msingi:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Kuunda mkusanyiko wa kazi ya lambda

Wacha tuunde chaguo la kukokotoa lambda liitwalo Mtoza, ambaye kazi yake itakuwa kupigia kura mtiririko wa tiketi_ya ndege na, ikiwa rekodi mpya zitapatikana hapo, weka rekodi hizi kwenye jedwali la DynamoDB. Ni wazi, pamoja na haki chaguo-msingi, lambda hii lazima iwe na ufikiaji wa kusoma kwa mtiririko wa data wa Kinesis na ufikiaji wa kuandika kwa DynamoDB.

Kuunda jukumu la IAM kwa kazi ya mkusanyaji lambda
Kwanza, hebu tuunde jukumu jipya la IAM kwa lambda inayoitwa Lambda-TicketsProcessingRole:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Kwa mfano wa jaribio, sera zilizosanidiwa awali za AmazonKinesisReadOnlyAccess na AmazonDynamoDBFullAccess zinafaa kabisa, kama inavyoonyeshwa kwenye picha hapa chini:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Lambda hii inapaswa kuzinduliwa na kichochezi kutoka kwa Kinesis maingizo mapya yanapoingia kwenye airline_stream, kwa hivyo tunahitaji kuongeza kichochezi kipya:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Kinachobaki ni kuingiza msimbo na kuhifadhi lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Kuunda arifa ya utendakazi wa lambda

Kazi ya pili ya lambda, ambayo itafuatilia mkondo wa pili (special_stream) na kutuma arifa kwa SNS, imeundwa kwa njia sawa. Kwa hivyo, lambda hii lazima iwe na ufikiaji wa kusoma kutoka kwa Kinesis na kutuma ujumbe kwa mada fulani ya SNS, ambayo itatumwa na huduma ya SNS kwa wanachama wote wa mada hii (barua pepe, SMS, nk).

Kuunda jukumu la IAM
Kwanza, tunaunda jukumu la IAM Lambda-KinesisAlarm kwa lambda hii, na kisha kukabidhi jukumu hili kwa alarm_notifier lambda inayoundwa:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Lambda hii inapaswa kufanya kazi kwenye kichochezi cha rekodi mpya ili kuingia special_stream, kwa hivyo unahitaji kusanidi kichochezi kwa njia sawa na tulivyofanya kwa Mtozaji lambda.

Ili kurahisisha kusanidi lambda hii, hebu tuanzishe kigeu kipya cha mazingira - TOPIC_ARN, ambapo tunaweka ANR (Majina ya Rejea ya Amazon) ya mada ya Mashirika ya Ndege:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Na ingiza nambari ya lambda, sio ngumu hata kidogo:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Inaonekana kwamba hapa ndipo usanidi wa mfumo wa mwongozo umekamilika. Kilichobaki ni kujaribu na kuhakikisha kuwa tumesanidi kila kitu kwa usahihi.

Tumia kutoka kwa msimbo wa Terraform

Maandalizi yanayohitajika

Terraform ni zana rahisi sana ya chanzo-wazi kwa kupeleka miundombinu kutoka kwa nambari. Ina syntax yake ambayo ni rahisi kujifunza na ina mifano mingi ya jinsi na nini cha kupeleka. Mhariri wa Atom au Msimbo wa Visual Studio una programu-jalizi nyingi ambazo hurahisisha kufanya kazi na Terraform.

Unaweza kupakua usambazaji hivyo. Uchambuzi wa kina wa uwezo wote wa Terraform ni zaidi ya upeo wa kifungu hiki, kwa hivyo tutajiwekea kikomo kwa vidokezo kuu.

Jinsi ya kuanza

Nambari kamili ya mradi ni katika hazina yangu. Tunajitengenezea hazina. Kabla ya kuanza, unahitaji kuhakikisha kuwa AWS CLI imewekwa na kusanidiwa, kwa sababu... Terraform itatafuta vitambulisho katika ~/.aws/faili la vitambulisho.

Mazoezi mazuri ni kutekeleza agizo la mpango kabla ya kupeleka miundombinu yote ili kuona ni nini Terraform inatutengenezea kwa sasa kwenye wingu:

terraform.exe plan

Utaulizwa kuingiza nambari ya simu kutuma arifa. Sio lazima kuiingiza katika hatua hii.

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Baada ya kuchambua mpango wa uendeshaji wa programu, tunaweza kuanza kuunda rasilimali:

terraform.exe apply

Baada ya kutuma amri hii, utaulizwa tena kuingiza nambari ya simu; piga "ndiyo" wakati swali kuhusu kufanya vitendo halisi linaonyeshwa. Hii itawawezesha kuanzisha miundombinu yote, kutekeleza usanidi wote muhimu wa EC2, kupeleka kazi za lambda, nk.

Baada ya rasilimali zote kuundwa kwa ufanisi kupitia msimbo wa Terraform, unahitaji kuingia katika maelezo ya programu ya Kinesis Analytics (kwa bahati mbaya, sikupata jinsi ya kufanya hivyo moja kwa moja kutoka kwa kanuni).

Fungua programu:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Baada ya hayo, lazima uweke kwa uwazi jina la mtiririko wa programu kwa kuchagua kutoka kwenye orodha kunjuzi:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Sasa kila kitu kiko tayari kwenda.

Kujaribu maombi

Bila kujali jinsi ulivyosambaza mfumo, kwa mikono au kupitia nambari ya Terraform, itafanya kazi sawa.

Tunaingia kupitia SSH kwa mashine pepe ya EC2 ambapo Wakala wa Kinesis amesakinishwa na kuendesha hati ya api_caller.py

sudo ./api_caller.py TOKEN

Unachotakiwa kufanya ni kusubiri SMS kwa nambari yako:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
SMS - ujumbe unafika kwenye simu kwa karibu dakika 1:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva
Inabakia kuona ikiwa rekodi zilihifadhiwa katika hifadhidata ya DynamoDB kwa uchanganuzi unaofuata, wa kina zaidi. Jedwali la tikiti za ndege lina takriban data ifuatayo:

Ujumuishaji wa API ya Aviasales na Amazon Kinesis na unyenyekevu usio na seva

Hitimisho

Katika kipindi cha kazi iliyofanywa, mfumo wa usindikaji wa data mtandaoni ulijengwa kulingana na Amazon Kinesis. Chaguo za kutumia Wakala wa Kinesis kwa kushirikiana na Mikondo ya Data ya Kinesis na uchanganuzi wa wakati halisi wa Kinesis Analytics kwa kutumia amri za SQL, pamoja na mwingiliano wa Amazon Kinesis na huduma zingine za AWS zilizingatiwa.

Tulisambaza mfumo ulio hapo juu kwa njia mbili: mwongozo mrefu na wa haraka kutoka kwa nambari ya Terraform.

Msimbo wote wa chanzo cha mradi unapatikana kwenye hazina yangu ya GitHub, ninapendekeza ujitambulishe nayo.

Nina furaha kujadili makala, natarajia maoni yako. Natumai ukosoaji wenye kujenga.

Nakutakia mafanikio!

Chanzo: mapenzi.com

Kuongeza maoni