Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

Հե՜յ Հաբր։

Ձեզ դուր է գալիս թռչող ինքնաթիռներ: Ինձ դուր է գալիս, բայց ինքնամեկուսացման ժամանակ ես նաև սիրահարվեցի վերլուծել ավիատոմսերի տվյալները մեկ հայտնի ռեսուրսից՝ Aviasales-ից:

Այսօր մենք կվերլուծենք Amazon Kinesis-ի աշխատանքը, կկառուցենք հոսքային համակարգ իրական ժամանակի վերլուծություններով, կտեղադրենք Amazon DynamoDB NoSQL տվյալների բազան որպես տվյալների հիմնական պահեստ և կստեղծենք SMS ծանուցումներ հետաքրքիր տոմսերի համար:

Բոլոր մանրամասները կտրվածքի տակ են: Գնա՛

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

Ներածություն

Օրինակ, մեզ անհրաժեշտ է մուտք դեպի Aviasales API. Դրա մուտքն ապահովվում է անվճար և առանց սահմանափակումների, պարզապես անհրաժեշտ է գրանցվել «Մշակողներ» բաժնում՝ տվյալների մուտք գործելու համար ձեր API նշանը ստանալու համար:

Այս հոդվածի հիմնական նպատակն է ընդհանուր պատկերացում տալ AWS-ում տեղեկատվության հոսքի օգտագործման մասին, մենք հաշվի ենք առնում, որ օգտագործված API-ի կողմից վերադարձված տվյալները խիստ արդիական չեն և փոխանցվում են քեշից, որը ձևավորվել է վերջին 48 ժամվա ընթացքում Aviasales.ru և Jetradar.com կայքերի օգտատերերի որոնումների հիման վրա:

Kinesis-agent-ը, որը տեղադրված է արտադրող մեքենայի վրա, ստացված API-ի միջոցով, ավտոմատ կերպով կվերլուծի և տվյալները կփոխանցի ցանկալի հոսքին Kinesis Data Analytics-ի միջոցով: Այս հոսքի հում տարբերակը կգրվի անմիջապես խանութում: DynamoDB-ում տեղակայված չմշակված տվյալների պահեստը թույլ կտա ավելի խորը վերլուծել տոմսերը BI գործիքների միջոցով, ինչպիսիք են AWS Quick Sight-ը:

Մենք կքննարկենք ամբողջ ենթակառուցվածքի տեղակայման երկու տարբերակ.

  • Ձեռնարկ - AWS կառավարման վահանակի միջոցով;
  • Terraform կոդի ենթակառուցվածքը ծույլ ավտոմատների համար է.

Մշակված համակարգի ճարտարապետությունը

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Օգտագործված բաղադրիչներ.

  • Aviasales API — այս API-ի կողմից վերադարձված տվյալները կօգտագործվեն հետագա բոլոր աշխատանքների համար.
  • EC2 Արտադրողի օրինակ — սովորական վիրտուալ մեքենա ամպի մեջ, որի վրա կստեղծվի մուտքային տվյալների հոսքը.
    • Kinesis գործակալ Java հավելված է, որը տեղադրված է մեքենայի վրա, որը տրամադրում է տվյալների հավաքագրման և ուղարկման հեշտ միջոց Kinesis-ին (Kinesis Data Streams կամ Kinesis Firehose): Գործակալը մշտապես վերահսկում է մի շարք ֆայլեր նշված գրացուցակներում և նոր տվյալներ է ուղարկում Kinesis;
    • API Caller Script — Python սկրիպտ, որը հարցումներ է կատարում API-ին և պատասխանը դնում է թղթապանակում, որը վերահսկվում է Kinesis Agent-ի կողմից.
  • Kinesis տվյալների հոսքեր — իրական ժամանակի տվյալների հոսքային ծառայություն՝ լայնածավալման հնարավորություններով.
  • Kinesis Analytics առանց սերվերի ծառայություն է, որը հեշտացնում է հոսքային տվյալների վերլուծությունը իրական ժամանակում: Amazon Kinesis Data Analytics-ը կարգավորում է հավելվածի ռեսուրսները և ինքնաբերաբար չափում է մուտքային տվյալների ցանկացած ծավալը մշակելու համար.
  • AWS Lambda — ծառայություն, որը թույլ է տալիս գործարկել կոդը՝ առանց սերվերների պահուստավորման կամ կարգավորելու: Ամբողջ հաշվողական հզորությունը ավտոմատ կերպով չափվում է յուրաքանչյուր զանգի համար.
  • Amazon DynamoDB - Բանալի-արժեք զույգերի և փաստաթղթերի տվյալների բազա, որն ապահովում է 10 միլիվայրկյանից պակաս ուշացում ցանկացած մասշտաբով աշխատելիս: DynamoDB-ն օգտագործելիս ձեզ հարկավոր չէ որևէ սերվեր տրամադրել, կարկատել կամ կառավարել: DynamoDB-ն ավտոմատ կերպով մեծացնում է աղյուսակները՝ հարմարեցնելու առկա ռեսուրսների քանակը և պահպանելու բարձր կատարողականությունը: Համակարգի կառավարում չի պահանջվում;
  • Amazon SNS - ամբողջությամբ կառավարվող ծառայություն՝ հրատարակիչ-բաժանորդ (Pub/Sub) մոդելի միջոցով հաղորդագրություններ ուղարկելու համար, որով կարող եք մեկուսացնել միկրոծառայությունները, բաշխված համակարգերը և առանց սերվերի հավելվածները: SNS-ը կարող է օգտագործվել վերջնական օգտագործողներին տեղեկատվություն ուղարկելու համար բջջային push ծանուցումների, SMS հաղորդագրությունների և էլ. նամակների միջոցով:

Նախնական ուսուցում

Տվյալների հոսքը ընդօրինակելու համար ես որոշեցի օգտագործել Aviasales API-ի կողմից վերադարձված ավիատոմսի տեղեկատվությունը: IN փաստաթղթավորում Տարբեր մեթոդների բավականին ընդարձակ ցանկ, եկեք վերցնենք դրանցից մեկը՝ «Ամսական գների օրացույց», որը վերադարձնում է գները ամսվա յուրաքանչյուր օրվա համար՝ խմբավորված ըստ փոխանցումների քանակի: Եթե ​​հարցումում չեք նշել որոնման ամիսը, ապա տեղեկատվությունը կվերադարձվի ընթացիկ ամսվան հաջորդող ամսվա համար:

Այսպիսով, եկեք գրանցվենք և ստանանք մեր նշանը:

Հարցման օրինակ՝ ստորև.

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

API-ից տվյալներ ստանալու վերը նշված մեթոդը՝ հարցումում նշան նշելով, կաշխատի, բայց ես նախընտրում եմ մուտքի նշանը փոխանցել վերնագրի միջոցով, ուստի մենք կօգտագործենք այս մեթոդը api_caller.py սկրիպտում:

Պատասխանի օրինակ.

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

API-ի վերևի պատասխանի օրինակը ցույց է տալիս Սանկտ Պետերբուրգից դեպի Ֆուկ տոմս... Օ՜, ինչ երազ...
Քանի որ ես Կազանից եմ, իսկ Պուկետն այժմ «միայն երազանք է», եկեք տոմսեր փնտրենք Սանկտ Պետերբուրգից Կազան:

Այն ենթադրում է, որ դուք արդեն ունեք AWS հաշիվ: Ուզում եմ անմիջապես հատուկ ուշադրություն հրավիրել այն փաստի վրա, որ Kinesis-ը և SMS-ով ծանուցումներ ուղարկելը ներառված չեն տարեկան Անվճար մակարդակ (անվճար օգտագործում). Բայց նույնիսկ չնայած դրան, մի երկու դոլար հաշվի առնելով, միանգամայն հնարավոր է կառուցել առաջարկվող համակարգը և խաղալ դրա հետ։ Եվ, իհարկե, մի մոռացեք ջնջել բոլոր ռեսուրսները, երբ դրանք այլևս անհրաժեշտ չեն:

Բարեբախտաբար, DynamoDb և lambda գործառույթները մեզ համար անվճար կլինեն, եթե մենք բավարարենք մեր ամսական անվճար սահմանաչափերը: Օրինակ, DynamoDB-ի համար՝ 25 ԳԲ պահեստ, 25 WCU/RCU և 100 միլիոն հարցում: Եվ մեկ միլիոն լամբդա ֆունկցիայի զանգեր ամսական:

Ձեռքով համակարգի տեղակայում

Kinesis տվյալների հոսքերի կարգավորում

Եկեք գնանք Kinesis Data Streams ծառայությանը և ստեղծենք երկու նոր հոսք՝ յուրաքանչյուրի համար մեկ բեկոր:

Ի՞նչ է բեկորը:
Բեկորը Amazon Kinesis հոսքի տվյալների փոխանցման հիմնական միավորն է: Մեկ հատվածը ապահովում է մուտքային տվյալների փոխանցում 1 ՄԲ/վ արագությամբ, իսկ ելքային տվյալների փոխանցումը 2 ՄԲ/վ արագությամբ: Մեկ հատվածն աջակցում է վայրկյանում մինչև 1000 PUT գրառում: Տվյալների հոսք ստեղծելիս անհրաժեշտ է նշել հատվածների անհրաժեշտ քանակությունը: Օրինակ, դուք կարող եք ստեղծել տվյալների հոսք երկու հատվածով: Տվյալների այս հոսքը կապահովի մուտքային տվյալների փոխանցում 2 ՄԲ/վ արագությամբ և ելքային տվյալների փոխանցում 4 ՄԲ/վ արագությամբ՝ աջակցելով մինչև 2000 PUT գրառումներ վայրկյանում:

Որքան շատ բեկորներ ձեր հոսքում, այնքան մեծ է դրա թողունակությունը: Սկզբունքորեն հոսքերն այսպես են մասշտաբվում՝ բեկորներ ավելացնելով: Բայց որքան շատ բեկորներ ունեք, այնքան բարձր է գինը: Յուրաքանչյուր բեկորն արժե 1,5 ցենտ մեկ ժամում և հավելյալ 1.4 ցենտ յուրաքանչյուր միլիոն PUT օգտակար բեռի միավորի համար:

Եկեք ստեղծենք նոր հոսք անունով ավիատոմսեր, 1 բեկորը նրան բավական կլինի.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Հիմա եկեք ստեղծենք մեկ այլ թեմա անունով հատուկ_հոսք:

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

Պրոդյուսերի կարգավորում

Առաջադրանքը վերլուծելու համար բավական է օգտագործել սովորական EC2 օրինակը որպես տվյալների արտադրող: Պարտադիր չէ, որ դա հզոր, թանկարժեք վիրտուալ մեքենա լինի, տեղում t2.micro-ն լավ կաշխատի:

Կարևոր նշում. օրինակ, դուք պետք է օգտագործեք պատկերը՝ Amazon Linux AMI 2018.03.0, այն ունի ավելի քիչ պարամետրեր Kinesis Agent-ն արագ գործարկելու համար:

Գնացեք EC2 ծառայություն, ստեղծեք նոր վիրտուալ մեքենա, ընտրեք ցանկալի AMI t2.micro տիպով, որը ներառված է Free Tier-ում.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Որպեսզի նորաստեղծ վիրտուալ մեքենան կարողանա փոխազդել Kinesis ծառայության հետ, պետք է դրա իրավունքներ տրվեն։ Դա անելու լավագույն միջոցը IAM-ի դեր նշանակելն է: Հետևաբար, Քայլ 3. Կարգավորել օրինակի մանրամասները էկրանին, դուք պետք է ընտրեք Ստեղծեք նոր IAM դեր:

IAM դերի ստեղծում EC2-ի համար
Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Բացվող պատուհանում ընտրեք, որ մենք նոր դեր ենք ստեղծում EC2-ի համար և գնացեք Թույլտվություններ բաժին.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Օգտագործելով ուսուցման օրինակը, մենք ստիպված չենք լինի մտնել ռեսուրսների իրավունքների հատիկավոր կազմաձևման բոլոր բարդությունները, ուստի մենք կընտրենք Amazon-ի կողմից նախապես կազմաձևված քաղաքականությունները՝ AmazonKinesisFullAccess և CloudWatchFullAccess:

Եկեք այս դերին որոշ իմաստալից անուն տանք, օրինակ՝ EC2-KinesisStreams-FullAccess: Արդյունքը պետք է լինի նույնը, ինչ ցույց է տրված ստորև նկարում.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Այս նոր դերը ստեղծելուց հետո մի մոռացեք այն կցել ստեղծված վիրտուալ մեքենայի օրինակին.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Մենք այլ բան չենք փոխում այս էկրանին և անցնում հաջորդ պատուհաններին:

Կոշտ սկավառակի կարգավորումները կարելի է թողնել որպես լռելյայն, ինչպես նաև պիտակները (չնայած լավ պրակտիկա է օգտագործել պիտակներ, գոնե օրինակին անուն տալ և նշել միջավայրը):

Այժմ մենք գտնվում ենք Քայլ 6. Կարգավորել Անվտանգության խմբի ներդիրը, որտեղ դուք պետք է ստեղծեք նորը կամ նշեք ձեր գոյություն ունեցող Անվտանգության խումբը, որը թույլ է տալիս միանալ ssh-ի միջոցով (պորտ 22) օրինակին: Ընտրեք Source -> My IP-ն այնտեղ և կարող եք գործարկել օրինակը:

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Հենց որ այն անցնի գործարկման կարգավիճակի, կարող եք փորձել միանալ դրան ssh-ի միջոցով:

Kinesis Agent-ի հետ աշխատելու համար մեքենային հաջողությամբ միանալուց հետո տերմինալում պետք է մուտքագրեք հետևյալ հրամանները.

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Եկեք ստեղծենք թղթապանակ՝ API-ի պատասխանները պահելու համար.

sudo mkdir /var/log/airline_tickets

Գործակալը գործարկելուց առաջ դուք պետք է կարգավորեք դրա կազմաձևը.

sudo vim /etc/aws-kinesis/agent.json

Agent.json ֆայլի բովանդակությունը պետք է այսպիսին լինի.

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Ինչպես երևում է կազմաձևման ֆայլից, գործակալը կվերահսկի .log ընդլայնմամբ ֆայլերը /var/log/airline_tickets/ գրացուցակում, կվերլուծի դրանք և կփոխանցի airline_tickets հոսքին:

Մենք վերագործարկում ենք ծառայությունը և համոզվում, որ այն աշխատում և աշխատում է.

sudo service aws-kinesis-agent restart

Այժմ ներբեռնենք Python սկրիպտը, որը տվյալներ կպահանջի API-ից.

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Api_caller.py սկրիպտը տվյալներ է խնդրում Aviasales-ից և ստացված պատասխանը պահպանում է այն գրացուցակում, որը սկանավորում է Kinesis գործակալը: Այս սցենարի իրականացումը բավականին ստանդարտ է, կա TicketsApi դաս, այն թույլ է տալիս ասինխրոն կերպով քաշել API-ն։ Մենք փոխանցում ենք վերնագիր նշանով և պարամետրեր ենք խնդրում այս դասին.

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Գործակալի ճիշտ կարգավորումներն ու ֆունկցիոնալությունը ստուգելու համար եկեք փորձարկենք api_caller.py սկրիպտը.

sudo ./api_caller.py TOKEN

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Եվ մենք աշխատանքի արդյունքին նայում ենք Գործակալների տեղեկամատյաններում և «Մոնիտորինգ» ներդիրում՝ airline_tickets տվյալների հոսքում.

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Ինչպես տեսնում եք, ամեն ինչ աշխատում է, և Kinesis Agent-ը հաջողությամբ տվյալներ է ուղարկում հոսքին: Այժմ եկեք կարգավորենք սպառողը:

Kinesis Data Analytics-ի կարգավորում

Եկեք անցնենք ամբողջ համակարգի կենտրոնական բաղադրիչին. ստեղծեք նոր հավելված Kinesis Data Analytics-ում, որը կոչվում է kinesis_analytics_airlines_app:

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Kinesis Data Analytics-ը թույլ է տալիս իրական ժամանակում տվյալների վերլուծություն կատարել Kinesis Streams-ից՝ օգտագործելով SQL լեզուն: Դա լիովին ավտոմատ մասշտաբային ծառայություն է (ի տարբերություն Kinesis Streams-ի), որը.

  1. թույլ է տալիս ստեղծել նոր հոսքեր (Ելքային հոսք)՝ հիմնվելով տվյալների աղբյուրի հարցումների վրա.
  2. ապահովում է հոսքային սխալներով, որոնք տեղի են ունեցել հավելվածների աշխատանքի ընթացքում (Error Stream);
  3. կարող է ավտոմատ կերպով որոշել մուտքային տվյալների սխեման (անհրաժեշտության դեպքում այն ​​կարող է ձեռքով վերասահմանվել):

Սա էժան ծառայություն չէ՝ 0.11 ԱՄՆ դոլար մեկ ժամ աշխատանքի համար, այնպես որ դուք պետք է ուշադիր օգտագործեք այն և ջնջեք այն, երբ ավարտեք:

Եկեք միացնենք հավելվածը տվյալների աղբյուրին.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Ընտրեք հոսքը, որին մենք պատրաստվում ենք միանալ (ավիաընկերության_տոմսեր).

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Հաջորդը, դուք պետք է կցեք նոր IAM Role, որպեսզի հավելվածը կարողանա կարդալ հոսքից և գրել հոսքին: Դա անելու համար բավական է ոչինչ չփոխել Մուտքի թույլտվությունների բլոկում.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Այժմ եկեք խնդրենք հայտնաբերել տվյալների սխեման հոսքում, դա անելու համար սեղմեք «Բացահայտեք սխեման» կոճակը: Արդյունքում, IAM դերը կթարմացվի (կստեղծվի նորը), և սխեմայի հայտնաբերումը կգործարկվի հոսքում արդեն հասած տվյալներից.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Այժմ դուք պետք է գնաք SQL խմբագիր: Երբ սեղմում եք այս կոճակը, կհայտնվի պատուհան՝ խնդրելով գործարկել հավելվածը. ընտրեք այն, ինչ ցանկանում եք գործարկել.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Տեղադրեք հետևյալ պարզ հարցումը SQL խմբագրիչի պատուհանում և սեղմեք Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Հարաբերական տվյալների բազաներում դուք աշխատում եք աղյուսակների հետ՝ օգտագործելով INSERT հայտարարությունները՝ գրառումներ ավելացնելու համար, և SELECT դրույթ՝ տվյալների հարցումների համար: Amazon Kinesis Data Analytics-ում դուք աշխատում եք հոսքերի (STREAM) և պոմպերի (PUMPs) հետ՝ շարունակական ներդիրների հարցումներ, որոնք հավելվածի մի հոսքից տվյալներ են ներմուծում մեկ այլ հոսքի մեջ:

Վերևում ներկայացված SQL հարցումը փնտրում է «Աերոֆլոտ»-ի տոմսեր հինգ հազար ռուբլուց ցածր արժեքով: Այս պայմաններին համապատասխանող բոլոր գրառումները կտեղադրվեն DESTINATION_SQL_STREAM հոսքում:

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Destination բլոկում ընտրեք special_stream հոսքը, իսկ ներծրագրային հոսքի անվան DESTINATION_SQL_STREAM բացվող ցանկում՝

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Բոլոր մանիպուլյացիաների արդյունքը պետք է նման լինի ստորև ներկայացված նկարին.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

SNS թեմայի ստեղծում և բաժանորդագրում

Գնացեք պարզ ծանուցման ծառայություն և այնտեղ նոր թեմա ստեղծեք Ավիաուղիներ անունով.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Բաժանորդագրվեք այս թեմային և նշեք բջջային հեռախոսահամարը, որին կուղարկվեն SMS ծանուցումներ.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

Ստեղծեք աղյուսակ DynamoDB-ում

Իրենց airline_tickets հոսքից չմշակված տվյալները պահելու համար եկեք DynamoDB-ում ստեղծենք նույն անունով աղյուսակ: Մենք կօգտագործենք record_id որպես հիմնական բանալի.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

Լամբդա ֆունկցիայի կոլեկցիոների ստեղծում

Եկեք ստեղծենք լամբդա ֆունկցիա, որը կոչվում է Collector, որի խնդիրն է լինելու հարցում կատարել airline_tickets հոսքի վրա և, եթե այնտեղ նոր գրառումներ հայտնաբերվեն, տեղադրենք այդ գրառումները DynamoDB աղյուսակում: Ակնհայտ է, որ ի լրումն լռելյայն իրավունքների, այս լամբդան պետք է ունենա կարդալու մուտք դեպի Kinesis տվյալների հոսք և գրելու մուտք դեպի DynamoDB:

Կոլեկցիոների լամբդա ֆունկցիայի համար IAM դերի ստեղծում
Նախ, եկեք ստեղծենք նոր IAM դեր լամբդայի համար Lambda-TicketsProcessingRole անունով:

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Փորձարկման օրինակի համար նախապես կազմաձևված AmazonKinesisReadOnlyAccess և AmazonDynamoDBFullAccess քաղաքականությունները բավականին հարմար են, ինչպես ցույց է տրված ստորև նկարում.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

Այս lambda-ն պետք է գործարկվի Kinesis-ի ձգանով, երբ նոր մուտքերը մտնում են airline_stream, այնպես որ մենք պետք է ավելացնենք նոր ձգան.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Մնում է միայն տեղադրել կոդը և պահպանել լամբդան:

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda ֆունկցիայի ծանուցիչի ստեղծում

Նմանատիպ ձևով է ստեղծվել նաև երկրորդ լամբդա ֆունկցիան, որը կվերահսկի երկրորդ հոսքը (special_stream) և ծանուցում կուղարկի SNS-ին։ Հետևաբար, այս լամբդան պետք է մուտք ունենա Kinesis-ից կարդալու և հաղորդագրություններ ուղարկելու տվյալ SNS թեմային, որոնք այնուհետև SNS ծառայության կողմից կուղարկվեն այս թեմայի բոլոր բաժանորդներին (էլ., SMS և այլն):

IAM դերի ստեղծում
Նախ, մենք ստեղծում ենք IAM դերը Lambda-KinesisAlarm այս lambda-ի համար, այնուհետև այս դերը վերագրում ենք ստեղծվող alarm_notifier lambda-ին.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

Այս lambda-ն պետք է աշխատի գործարկիչի վրա, որպեսզի նոր գրառումները մուտքագրվեն special_stream, այնպես որ դուք պետք է կարգավորեք ձգանն այնպես, ինչպես մենք արեցինք Collector lambda-ի համար:

Այս lambda-ի կարգավորումն ավելի հեշտ դարձնելու համար եկեք ներկայացնենք նոր միջավայրի փոփոխական՝ TOPIC_ARN, որտեղ մենք տեղադրում ենք ANR (Amazon Recourse Names) ավիաընկերությունների թեման.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Եվ տեղադրեք լամբդա կոդը, դա ամենևին էլ բարդ չէ.

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Թվում է, թե այստեղ ավարտված է ձեռքով համակարգի կազմաձևումը: Մնում է միայն փորձարկել և համոզվել, որ մենք ամեն ինչ ճիշտ ենք կարգավորել։

Տեղադրեք Terraform կոդից

Անհրաժեշտ նախապատրաստություն

Terraform շատ հարմար բաց կոդով գործիք է կոդից ենթակառուցվածքը տեղակայելու համար: Այն ունի իր սեփական շարահյուսությունը, որը հեշտ է սովորել և ունի բազմաթիվ օրինակներ, թե ինչպես և ինչ տեղակայել: Atom խմբագրիչը կամ Visual Studio Code-ն ունի շատ հարմար պլագիններ, որոնք հեշտացնում են Terraform-ի հետ աշխատելը:

Դուք կարող եք ներբեռնել բաշխումը ուստի. Terraform-ի բոլոր հնարավորությունների մանրամասն վերլուծությունը դուրս է այս հոդվածի շրջանակներից, ուստի մենք կսահմանափակվենք հիմնական կետերով:

Ինչպես սկսել

Նախագծի ամբողջական կոդը իմ պահոցում. Մենք կլոնավորում ենք պահոցը մեզ համար: Նախքան սկսելը, դուք պետք է համոզվեք, որ դուք ունեք AWS CLI տեղադրված և կազմաձևված, քանի որ... Terraform-ը կփնտրի հավատարմագրերը ~/.aws/credentials ֆայլում:

Լավ պրակտիկա է պլանի հրամանը գործարկել նախքան ամբողջ ենթակառուցվածքը տեղակայելը, որպեսզի տեսնենք, թե Terraform-ը ներկայումս ինչ է ստեղծում մեզ համար ամպի մեջ.

terraform.exe plan

Ձեզ կառաջարկվի մուտքագրել հեռախոսահամար՝ ծանուցումներ ուղարկելու համար: Այս փուլում այն ​​մուտքագրել պարտադիր չէ։

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Վերլուծելով ծրագրի գործառնական պլանը՝ կարող ենք սկսել ռեսուրսներ ստեղծել.

terraform.exe apply

Այս հրամանն ուղարկելուց հետո ձեզ նորից կառաջարկվի մուտքագրել հեռախոսահամար, հավաքեք «այո», երբ ցուցադրվի գործողությունների իրական կատարման վերաբերյալ հարց: Սա թույլ կտա կարգավորել ամբողջ ենթակառուցվածքը, իրականացնել EC2-ի բոլոր անհրաժեշտ կոնֆիգուրացիան, տեղակայել լամբդա ֆունկցիաները և այլն:

Այն բանից հետո, երբ բոլոր ռեսուրսները հաջողությամբ ստեղծվեն Terraform կոդի միջոցով, դուք պետք է մտնեք Kinesis Analytics հավելվածի մանրամասները (ցավոք, ես չգտա, թե ինչպես դա անել անմիջապես կոդից):

Գործարկել հավելվածը.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Դրանից հետո դուք պետք է հստակորեն սահմանեք ներծրագրային հոսքի անունը՝ բացվող ցանկից ընտրելով.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Այժմ ամեն ինչ պատրաստ է գնալու։

Դիմումի փորձարկում

Անկախ նրանից, թե ինչպես եք տեղադրել համակարգը, ձեռքով կամ Terraform կոդի միջոցով, այն կաշխատի նույնը:

Մենք SSH-ի միջոցով մուտք ենք գործում EC2 վիրտուալ մեքենա, որտեղ տեղադրված է Kinesis Agent-ը և գործարկում api_caller.py սկրիպտը:

sudo ./api_caller.py TOKEN

Ընդամենը պետք է սպասել SMS հաղորդագրությանը ձեր համարին.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
SMS - հաղորդագրությունը հասնում է հեռախոսին գրեթե 1 րոպեում.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության
Մնում է տեսնել, թե արդյոք գրառումները պահպանվել են DynamoDB տվյալների բազայում հետագա, ավելի մանրամասն վերլուծության համար: Ավիատոմսերի աղյուսակը պարունակում է մոտավորապես հետևյալ տվյալները.

Aviasales API-ի ինտեգրում Amazon Kinesis-ի և առանց սերվերի պարզության

Ամփոփում

Կատարված աշխատանքների ընթացքում կառուցվել է առցանց տվյալների մշակման համակարգ՝ հիմնված Amazon Kinesis-ի վրա։ Դիտարկվել են Kinesis Agent-ի օգտագործման տարբերակները Kinesis Data Streams-ի և իրական ժամանակի վերլուծության Kinesis Analytics-ի հետ՝ օգտագործելով SQL հրամանները, ինչպես նաև Amazon Kinesis-ի փոխազդեցությունը AWS այլ ծառայությունների հետ:

Մենք գործարկեցինք վերը նշված համակարգը երկու եղանակով՝ բավականին երկար ձեռնարկ և արագ՝ Terraform կոդից:

Բոլոր նախագծի սկզբնական կոդը հասանելի է իմ GitHub պահոցում, առաջարկում եմ ծանոթանալ դրան։

Ուրախ եմ հոդվածը քննարկելու համար, անհամբեր սպասում եմ ձեր մեկնաբանություններին: Կառուցողական քննադատության հույս ունեմ։

Ձեզ հաջողություն եմ ցանկանում!

Source: www.habr.com

Добавить комментарий