Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Pa Habr!

Kodi mumakonda ndege zowuluka? Ndimakonda, koma panthawi yodzipatula ndidakondanso kusanthula matikiti a ndege kuchokera ku chinthu chimodzi chodziwika bwino - Aviasales.

Lero tidzasanthula ntchito ya Amazon Kinesis, kumanga makina osindikizira ndi ma analytics enieni, kukhazikitsa Amazon DynamoDB NoSQL database monga chosungira chachikulu cha deta, ndikukhazikitsa zidziwitso za SMS za matikiti osangalatsa.

Zonse zili pansi pa odulidwa! Pitani!

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Mau oyamba

Mwachitsanzo, tiyenera kupeza Aviasales API. Kufikira kumaperekedwa kwaulere komanso popanda zoletsa; muyenera kungolembetsa mu gawo la "Madivelopa" kuti mulandire chizindikiro chanu cha API kuti mupeze zambiri.

Cholinga chachikulu cha nkhaniyi ndikupereka kumvetsetsa kwachidziwitso kwa kugwiritsidwa ntchito kwa chidziwitso mu AWS; timaganizira kuti deta yobwezeredwa ndi API yomwe imagwiritsidwa ntchito siili yamakono ndipo imafalitsidwa kuchokera ku cache, yomwe ndi opangidwa kutengera kusaka kwa ogwiritsa ntchito masamba a Aviasales.ru ndi Jetradar.com kwa maola 48 apitawa.

Kinesis-agent, yoyikidwa pamakina opangira, yolandiridwa kudzera pa API imangoyang'ana ndikutumiza deta kumtsinje womwe mukufuna kudzera pa Kinesis Data Analytics. Mtundu wosasinthika wa mtsinjewu ulembedwa mwachindunji kusitolo. Zosungirako zosaphika zomwe zatumizidwa ku DynamoDB zilola kusanthula kwa matikiti mwakuya kudzera mu zida za BI, monga AWS Quick Sight.

Tidzalingalira njira ziwiri zogwiritsira ntchito zomangamanga zonse:

  • Buku - kudzera pa AWS Management Console;
  • Zomangamanga zochokera ku Terraform code ndi zama automators aulesi;

Zomangamanga za dongosolo lopangidwa

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Zogwiritsidwa ntchito:

  • Aviasales API - zomwe zabwezedwa ndi API iyi zidzagwiritsidwa ntchito pazotsatira zonse;
  • EC2 Producer Instance - makina okhazikika omwe ali mumtambo pomwe zolowetsamo zidzapangidwira:
    • Kinesis Agent ndi pulogalamu ya Java yomwe imayikidwa kwanuko pamakina omwe amapereka njira yosavuta yosonkhanitsira ndi kutumiza deta ku Kinesis (Kinesis Data Streams kapena Kinesis Firehose). Wothandizira nthawi zonse amayang'anira mndandanda wa mafayilo muzolemba zomwe zatchulidwa ndikutumiza deta yatsopano ku Kinesis;
    • API Caller Script - Python script yomwe imapanga zopempha ku API ndikuyika yankho mufoda yomwe imayang'aniridwa ndi Kinesis Agent;
  • Kinesis Data Mitsinje - ntchito yotsatsira nthawi yeniyeni yokhala ndi kuthekera kokulirapo;
  • Kinesis Analytics ndi ntchito yopanda seva yomwe imathandizira kusanthula kwa data yomwe ikukhamukira munthawi yeniyeni. Amazon Kinesis Data Analytics imakonza zida zogwiritsira ntchito ndikudziyesa zokha kuti zigwirizane ndi kuchuluka kwa deta yomwe ikubwera;
  • AWS Lambda - ntchito yomwe imakupatsani mwayi woyendetsa ma code osasunga kapena kukhazikitsa ma seva. Mphamvu zonse zamakompyuta zimasinthidwa zokha pa kuyimba kulikonse;
  • Amazon DynamoDB - Dongosolo lamakiyi awiriawiri ndi zolemba zomwe zimapereka latency yochepera 10 milliseconds mukamayenda pamlingo uliwonse. Mukamagwiritsa ntchito DynamoDB, simuyenera kupereka, kuyika, kapena kuyang'anira ma seva aliwonse. DynamoDB imangoyesa matebulo kuti isinthe kuchuluka kwazinthu zomwe zilipo ndikusunga magwiridwe antchito apamwamba. Palibe kasamalidwe kadongosolo kofunikira;
  • Amazon SNS ndi ntchito yotumizira mauthenga yomwe imayendetsedwa mokwanira kutengera chitsanzo cha osindikiza (Pub/Sub), chomwe chingagwiritsidwe ntchito kupatula ma microservices, makina ogawa, ndi mapulogalamu opanda seva. SNS itha kugwiritsidwa ntchito kutumiza zidziwitso kwa ogwiritsa ntchito kumapeto kudzera pazidziwitso zapa foni yam'manja, ma SMS ndi maimelo.

Maphunziro oyambirira

Kuti nditsanzire mayendedwe a data, ndidaganiza zogwiritsa ntchito chidziwitso cha tikiti yandege yobwezedwa ndi Aviasales API. MU zolemba mndandanda wambiri wa njira zosiyanasiyana, tiyeni titenge imodzi mwa izo - "Kalendala ya Mwezi Wamtengo Wapamwezi", yomwe imabwezera mitengo ya tsiku lililonse la mwezi, m'magulumagulu ndi chiwerengero cha kusamutsidwa. Ngati simunatchule mwezi wofufuzira mu pempho, zambiri zidzabwezedwa mwezi wotsatira womwe ulipo.

Chifukwa chake, tiyeni tilembetse ndikupeza chizindikiro chathu.

Pempho lachitsanzo lili pansipa:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Njira yomwe ili pamwambayi yolandirira deta kuchokera ku API mwa kufotokoza chizindikiro mu pempho idzagwira ntchito, koma ndimakonda kudutsa chizindikiro chofikira pamutu, kotero tidzagwiritsa ntchito njirayi mu api_caller.py script.

Yankho chitsanzo:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Chitsanzo cha yankho la API pamwambapa chikuwonetsa tikiti yochokera ku St. Petersburg kupita ku Phuk ... O, ndimaloto otani ...
Popeza ndikuchokera ku Kazan, ndipo Phuket tsopano ndi "maloto okha", tiyeni tiyang'ane matikiti kuchokera ku St. Petersburg kupita ku Kazan.

Zimangoganiza kuti muli ndi akaunti ya AWS kale. Ndikufuna kuti nthawi yomweyo ndiwonetse chidwi chapadera kuti Kinesis ndi kutumiza zidziwitso kudzera pa SMS sizikuphatikizidwa pachaka. Gawo laulere (ntchito yaulere). Koma ngakhale izi, ndi ndalama zingapo m'malingaliro, ndizotheka kupanga dongosolo lomwe akufuna ndikusewera nalo. Ndipo, ndithudi, musaiwale kuchotsa zipangizo zonse pambuyo poti sizikufunikanso.

Mwamwayi, DynamoDb ndi ntchito za lambda zidzakhala zaulere kwa ife ngati tikwaniritsa malire athu aulere pamwezi. Mwachitsanzo, pa DynamoDB: 25 GB yosungirako, 25 WCU/RCU ndi mafunso 100 miliyoni. Ndipo ntchito ya lambda miliyoni imayimba pamwezi.

Kutumiza kwadongosolo pamanja

Kukhazikitsa Mitsinje ya Kinesis Data

Tiyeni tipite ku utumiki wa Kinesis Data Streams ndikupanga mitsinje iwiri yatsopano, shard imodzi kwa iliyonse.

Kodi shard ndi chiyani?
A shard ndiye gawo loyambira losamutsa deta la Amazon Kinesis mtsinje. Gawo limodzi limapereka kusamutsa kwa data lolowera pa liwiro la 1 MB/s ndi kusamutsa deta pa liwiro la 2 MB/s. Gawo limodzi limathandizira mpaka 1000 PUT zolowa pamphindikati. Mukamapanga mtsinje wa data, muyenera kufotokoza chiwerengero chofunikira cha zigawo. Mwachitsanzo, mutha kupanga mtsinje wa data wokhala ndi magawo awiri. Deta iyi idzapereka kutumiza kwa deta ku 2 MB / s ndi kutumiza kwa deta ku 4 MB / s, kuthandizira mpaka 2000 PUT zolemba pamphindi.

Pamene mitsinje yambiri mumtsinje wanu, imakula kwambiri. M'malo mwake, umu ndi momwe mafunde amachulukira - powonjezera shards. Koma mukakhala ndi shards zambiri, mtengo wake ndi wapamwamba kwambiri. Shard iliyonse imawononga masenti 1,5 pa ola limodzi ndi masenti 1.4 owonjezera pa miliyoni iliyonse ya PUT.

Tiyeni tipange mtsinje watsopano wokhala ndi dzina ndege_tiketi, 1 shard idzakhala yokwanira kwa iye:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Tsopano tiyeni tipange ulusi wina ndi dzina wapadera_mtsinje:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Kupanga kwa wopanga

Kusanthula ntchito, ndikokwanira kugwiritsa ntchito EC2 yokhazikika ngati wopanga deta. Sichiyenera kukhala makina amphamvu, okwera mtengo; malo t2.micro adzachita bwino.

Chofunikira chofunikira: mwachitsanzo, muyenera kugwiritsa ntchito chithunzi - Amazon Linux AMI 2018.03.0, ili ndi zoikamo zocheperako poyambitsa mwachangu Kinesis Agent.

Pitani ku ntchito ya EC2, pangani makina atsopano, sankhani AMI yomwe mukufuna ndi mtundu wa t2.micro, yomwe ikuphatikizidwa mu Free Tier:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kuti makina opangidwa kumene azitha kulumikizana ndi ntchito ya Kinesis, iyenera kupatsidwa ufulu wochita izi. Njira yabwino yochitira izi ndikugawa Udindo wa IAM. Choncho, pa Gawo 3: Konzani Instance Tsatanetsatane chophimba, muyenera kusankha Pangani Udindo watsopano wa IAM:

Kupanga gawo la IAM la EC2
Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Pazenera lomwe likutsegulidwa, sankhani kuti tikupanga gawo latsopano la EC2 ndikupita kugawo la Zilolezo:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Pogwiritsa ntchito chitsanzo cha maphunziro, sitiyenera kupita ku zovuta zonse za kasinthidwe ka granular za ufulu wazinthu, kotero tidzasankha ndondomeko zomwe zinakonzedweratu ndi Amazon: AmazonKinesisFullAccess ndi CloudWatchFullAccess.

Tiyeni tipereke dzina latanthauzo la ntchitoyi, mwachitsanzo: EC2-KinesisStreams-FullAccess. Zotsatira zake ziyenera kukhala zofanana ndi zomwe zikuwonetsedwa pachithunzichi:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Mukapanga gawo latsopanoli, musaiwale kuziphatikiza ndi makina omwe adapangidwa:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Sitisintha china chilichonse pazenerali ndikupitilira mazenera otsatira.

Zokonda pa hard drive zitha kusiyidwa ngati zosasintha, komanso ma tag (ngakhale kuli bwino kugwiritsa ntchito ma tag, perekani chitsanzocho dzina ndikuwonetsa chilengedwe).

Tsopano tili pa Gawo 6: Konzani Gulu la Chitetezo tabu, pomwe muyenera kupanga latsopano kapena tchulani gulu lanu lachitetezo lomwe lilipo, lomwe limakupatsani mwayi wolumikizana kudzera pa ssh (doko 22) ku chitsanzocho. Sankhani Gwero -> IP yanga pamenepo ndipo mutha kuyambitsa chitsanzo.

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Ikangosintha kukhala woyendetsa, mutha kuyesa kulumikizana nayo kudzera pa ssh.

Kuti muthe kugwira ntchito ndi Kinesis Agent, mutalumikizana bwino ndi makina, muyenera kuyika malamulo otsatirawa mu terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Tiyeni tipange foda kuti tisunge mayankho a API:

sudo mkdir /var/log/airline_tickets

Musanayambe wothandizirayo, muyenera kukonza kasinthidwe kake:

sudo vim /etc/aws-kinesis/agent.json

Zomwe zili mufayilo ya agent.json ziyenera kuwoneka motere:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Monga momwe zikuwonekera kuchokera pa fayilo yokonzekera, wothandizira adzayang'anira mafayilo ndi .log extension mu /var/log/airline_tickets/ directory, awerenge ndikuwasamutsa ku airline_tickets mtsinje.

Timayambiranso ntchito ndikuwonetsetsa kuti ikugwira ntchito:

sudo service aws-kinesis-agent restart

Tsopano tiyeni titsitse script ya Python yomwe idzapempha deta kuchokera ku API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Zolemba za api_caller.py zimapempha deta kuchokera ku Aviasales ndikusunga mayankho omwe alandilidwa m'ndandanda yomwe wothandizira wa Kinesis amasanthula. Kukhazikitsidwa kwa script iyi ndikokhazikika, pali gulu la TicketsApi, limakupatsani mwayi wokoka API. Timadutsa mutu ndi chizindikiro ndikupempha magawo ku kalasi ili:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Kuti tiyese zosintha zolondola ndi magwiridwe antchito a wothandizirayo, tiyeni tiyese api_caller.py script:

sudo ./api_caller.py TOKEN

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Ndipo timayang'ana zotsatira za ntchito mu zolemba za Agent ndi pa Monitoring tabu mu airline_tickets data stream:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Monga mukuonera, chirichonse chimagwira ntchito ndipo Kinesis Agent amatumiza bwinobwino deta kumtsinje. Tsopano tiyeni tikonze ogula.

Kukhazikitsa Kinesis Data Analytics

Tiyeni tipitirire ku gawo lapakati pa dongosolo lonse - pangani pulogalamu yatsopano mu Kinesis Data Analytics yotchedwa kinesis_analytics_airlines_app:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kinesis Data Analytics imakupatsani mwayi wofufuza zenizeni zenizeni kuchokera ku Kinesis Streams pogwiritsa ntchito chilankhulo cha SQL. Ndi ntchito yodziyendetsa yokha (mosiyana ndi Kinesis Mitsinje) yomwe:

  1. amakulolani kuti mupange mitsinje yatsopano (Output Stream) potengera zopempha zopezera deta;
  2. imapereka mtsinje wokhala ndi zolakwika zomwe zidachitika pomwe mapulogalamu akugwira ntchito (Error Stream);
  3. imatha kudziwikiratu dongosolo lazolowera (limatha kufotokozedwanso pamanja ngati kuli kofunikira).

Iyi si ntchito yotsika mtengo - 0.11 USD pa ola la ntchito, kotero muyenera kuigwiritsa ntchito mosamala ndikuyichotsa mukamaliza.

Tiyeni tilumikize pulogalamuyi kugwero la data:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Sankhani mayendedwe omwe tilumikizako (airline_tickets):

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kenako, muyenera kulumikiza gawo latsopano la IAM kuti pulogalamuyo iwerenge kuchokera pamtsinje ndikulembera kumtsinje. Kuti muchite izi, ndikokwanira kuti musasinthe chilichonse mu block ya zilolezo za Access:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Tsopano tiyeni tipemphe kupezeka kwa schema ya data mumtsinje; kuti muchite izi, dinani batani la "Discover schema". Zotsatira zake, gawo la IAM lidzasinthidwa (latsopano lidzapangidwa) ndipo kuzindikira kwa schema kudzakhazikitsidwa kuchokera ku deta yomwe yafika kale mumtsinje:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Tsopano muyenera kupita ku SQL mkonzi. Mukadina batani ili, zenera lidzawoneka likukupemphani kuti mutsegule pulogalamuyi - sankhani zomwe mukufuna kuyambitsa:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Lowetsani funso losavuta lotsatira pawindo la mkonzi wa SQL ndikudina Sungani ndi Kuthamanga SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

M'malo osungiramo maubale, mumagwira ntchito ndi matebulo pogwiritsa ntchito mawu a INSERT kuti muwonjezere marekodi ndi mawu osankhidwa kuti mufunse zambiri. Mu Amazon Kinesis Data Analytics, mumagwira ntchito ndi mitsinje (STREAMs) ndi mapampu (PUMPs) -zopempha zopitirizabe zomwe zimayika deta kuchokera pamtsinje umodzi mu pulogalamu mumtsinje wina.

Funso la SQL lomwe laperekedwa pamwambapa limasaka matikiti a Aeroflot pamtengo wotsika ma ruble zikwi zisanu. Zolemba zonse zomwe zikugwirizana ndi izi zidzayikidwa mumtsinje wa DESTINATION_SQL_STREAM.

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Pamalo omwe Mukupita, sankhani special_stream stream, ndi mumndandanda wotsikira pansi wa dzina la In-application stream DESTINATION_SQL_STREAM:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Zotsatira zakusintha konse ziyenera kukhala zofanana ndi chithunzi chomwe chili pansipa:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Kupanga ndikulembetsa kumutu wa SNS

Pitani ku Simple Notification Service ndikupanga mutu watsopano pamenepo ndi dzina la Airlines:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Lembetsani ku mutuwu ndikuwonetsa nambala ya foni yam'manja yomwe zidziwitso za SMS zidzatumizidwa:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Pangani tebulo mu DynamoDB

Kuti tisunge zidziwitso kuchokera kumayendedwe awo a airline_tickets, tiyeni tipange tebulo mu DynamoDB lomwe lili ndi dzina lomwelo. Tigwiritsa ntchito record_id ngati kiyi yoyamba:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Kupanga chosonkhanitsa cha lambda

Tiyeni tipange ntchito ya lambda yotchedwa Collector, yomwe ntchito yake idzakhala kufufuza ndege_tickets mtsinje ndipo, ngati zolemba zatsopano zapezeka pamenepo, ikani zolembazi mu tebulo la DynamoDB. Mwachiwonekere, kuwonjezera pa maufulu osasinthika, lambda iyi iyenera kukhala ndi mwayi wowerengera kumtsinje wa data wa Kinesis ndikulemba mwayi wopita ku DynamoDB.

Kupanga gawo la IAM pa ntchito yosonkhanitsa lambda
Choyamba, tiyeni tipange gawo latsopano la IAM la lambda yotchedwa Lambda-TicketsProcessingRole:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kwa chitsanzo choyesera, ndondomeko zokonzedweratu za AmazonKinesisReadOnlyAccess ndi AmazonDynamoDBFullAccess ndizoyenera, monga momwe chithunzi chili pansipa:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Lambda iyi iyenera kukhazikitsidwa ndi choyambitsa kuchokera ku Kinesis pomwe zolembera zatsopano zimalowa airline_stream, chifukwa chake tiyenera kuwonjezera choyambitsa chatsopano:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Chotsalira ndikuyika kachidindo ndikusunga lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Kupanga chidziwitso cha ntchito ya lambda

Ntchito yachiwiri ya lambda, yomwe idzayang'anire mtsinje wachiwiri (special_stream) ndikutumiza chidziwitso ku SNS, imapangidwa mofananamo. Chifukwa chake, lambda iyi iyenera kukhala ndi mwayi wowerenga kuchokera ku Kinesis ndikutumiza mauthenga kumutu womwe wapatsidwa wa SNS, womwe udzatumizidwa ndi ntchito ya SNS kwa onse olembetsa mutuwu (imelo, SMS, ndi zina).

Kupanga gawo la IAM
Choyamba, timapanga gawo la IAM Lambda-KinesisAlarm pa lambda iyi, ndikugawa gawoli ku alarm_notifier lambda yomwe ikupangidwa:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Lambda iyi iyenera kugwira ntchito poyambitsa zolemba zatsopano kuti zilowe mu special_stream, kotero muyenera kukonza choyambitsacho mofanana ndi momwe tinachitira kwa Collector lambda.

Kuti kukhale kosavuta kukonza lambda iyi, tiyeni tidziwitse zakusintha kwachilengedwe - TOPIC_ARN, pomwe timayika ANR (Maina Othandizira a Amazon) pamutu wa Airlines:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Ndipo ikani nambala ya lambda, sizovuta konse:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Zikuwoneka kuti apa ndi pamene kasinthidwe kachitidwe kamanja kamalizidwe. Chotsalira ndikuyesa ndikuwonetsetsa kuti takonza zonse molondola.

Ikani kuchokera ku Terraform code

Kukonzekera kofunikira

Terraform ndi chida chosavuta chotsegulira poyika zida zopangira ma code. Ili ndi mawu akeake omwe ndi osavuta kuphunzira ndipo ali ndi zitsanzo zambiri za momwe angagwiritsire ntchito komanso zomwe angatumize. The Atom editor kapena Visual Studio Code ili ndi mapulagini ambiri othandiza omwe amapangitsa kuti Terraform ikhale yosavuta.

Mukhoza kukopera kugawa kuchokera pano. Kusanthula mwatsatanetsatane za kuthekera konse kwa Terraform sikungakwaniritsidwe ndi nkhaniyi, chifukwa chake tikhala ndi mfundo zazikuluzikulu.

Momwe mungayambire

Khodi yonse ya polojekitiyi ndi m'nkhokwe yanga. Timadzipangira tokha posungira. Musanayambe, muyenera kuwonetsetsa kuti mwakhazikitsa ndi kukonza AWS CLI, chifukwa ... Terraform idzayang'ana zotsimikizira mu ~/.aws/credentials file.

Chizoloŵezi chabwino ndikuyendetsa dongosolo ladongosolo musanatumize zida zonse kuti muwone zomwe Terraform akutipangira pamtambo:

terraform.exe plan

Mudzafunsidwa kuti mulowetse nambala yafoni kuti mutumizeko zidziwitso. Sikoyenera kulowa mu gawo ili.

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Titasanthula dongosolo la ntchito ya pulogalamuyi, titha kuyamba kupanga zothandizira:

terraform.exe apply

Pambuyo potumiza lamuloli, mudzafunsidwanso kuti mulowetse nambala yafoni; imbani "inde" pamene funso lokhudza kuchitapo kanthu likuwonekera. Izi zikuthandizani kuti mukhazikitse maziko onse, kuchita zosintha zonse zofunika za EC2, kutumiza ntchito za lambda, ndi zina zambiri.

Zida zonse zitapangidwa bwino kudzera mu code ya Terraform, muyenera kulowa mwatsatanetsatane wa Kinesis Analytics application (mwatsoka, sindinapeze momwe ndingachitire izi mwachindunji kuchokera pa code).

Yambitsani pulogalamuyi:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Zitatha izi, muyenera kukhazikitsa dzina lolowera mkati mwa pulogalamuyo posankha kuchokera pamndandanda wotsikira pansi:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Tsopano zonse zakonzeka kupita.

Kuyesa ntchito

Mosasamala kanthu momwe mudatumizira dongosolo, pamanja kapena kudzera pa Terraform code, imagwira ntchito chimodzimodzi.

Timalowa kudzera pa SSH kupita ku makina enieni a EC2 pomwe Kinesis Agent imayikidwa ndikuyendetsa api_caller.py script.

sudo ./api_caller.py TOKEN

Zomwe muyenera kuchita ndikudikirira SMS ku nambala yanu:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
SMS - uthenga ufika pafoni pafupifupi mphindi imodzi:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Zimatsalira kuti tiwone ngati zolembazo zidasungidwa mu database ya DynamoDB kuti mufufuze mwatsatanetsatane. Gulu la airline_tickets lili ndi pafupifupi data iyi:

Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva

Pomaliza

M'kati mwa ntchito yomwe idachitika, makina opangira ma data pa intaneti adamangidwa kutengera Amazon Kinesis. Zosankha zogwiritsira ntchito Kinesis Agent mogwirizana ndi Kinesis Data Streams ndi nthawi yeniyeni analytics Kinesis Analytics pogwiritsa ntchito malamulo a SQL, komanso kuyanjana kwa Amazon Kinesis ndi ntchito zina za AWS zinaganiziridwa.

Tidayika dongosolo lomwe lili pamwambapa m'njira ziwiri: buku lalitali komanso lachangu kuchokera ku code ya Terraform.

Code source source yonse ilipo m'malo anga a GitHub, ndikukupemphani kuti muzidziwe bwino.

Ndine wokondwa kukambirana nkhaniyi, ndikuyembekezera ndemanga zanu. Ndikuyembekeza kutsutsidwa kolimbikitsa.

Ndikufunirani zabwino!

Source: www.habr.com

Kuwonjezera ndemanga