Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

Хөөе Хабр!

Та нисэх онгоцонд дуртай юу? Би үүнд дуртай, гэхдээ өөрийгөө тусгаарлах үеэр би нэг алдартай эх сурвалж болох Aviasales-ээс онгоцны тийзний мэдээлэлд дүн шинжилгээ хийх дуртай болсон.

Өнөөдөр бид Amazon Kinesis-ийн ажилд дүн шинжилгээ хийж, бодит цагийн аналитик бүхий стриминг системийг бий болгож, Amazon DynamoDB NoSQL мэдээллийн санг үндсэн мэдээллийн сан болгон суулгаж, сонирхолтой тасалбарын SMS мэдэгдлийг тохируулах болно.

Бүх нарийн ширийн зүйл нь захын доор байна! Яв!

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

Танилцуулга

Жишээлбэл, бидэнд хандах хэрэгтэй Aviasales API. Түүнд хандах эрх нь үнэ төлбөргүй бөгөөд ямар ч хязгаарлалтгүй бөгөөд та өгөгдөлд хандахын тулд API токеноо авахын тулд "Хөгжүүлэгчид" хэсэгт бүртгүүлэхэд л хангалттай.

Энэхүү нийтлэлийн гол зорилго нь AWS-д мэдээллийн урсгалыг ашиглах талаар ерөнхий ойлголт өгөх явдал юм; ашигласан API-аас буцаасан өгөгдөл нь яг шинэчлэгдээгүй бөгөөд кэшээс дамждаг гэдгийг бид анхаарч үздэг. Aviasales.ru болон Jetradar.com сайтуудын хэрэглэгчдийн сүүлийн 48 цагийн хайлтанд үндэслэн үүсгэсэн.

API-ээр дамжуулан хүлээн авсан үйлдвэрлэгч машин дээр суурилуулсан Kinesis-агент нь Kinesis Data Analytics-ээр дамжуулан өгөгдлийг автоматаар задлан, хүссэн урсгал руу дамжуулах болно. Энэ урсгалын түүхий хувилбарыг дэлгүүрт шууд бичих болно. DynamoDB-д байрлуулсан түүхий мэдээллийн хадгалалт нь AWS Quick Sight гэх мэт BI хэрэгслээр дамжуулан тасалбарын гүнзгий дүн шинжилгээ хийх боломжийг олгоно.

Бид бүхэл бүтэн дэд бүтцийг байрлуулах хоёр сонголтыг авч үзэх болно.

  • Гарын авлага - AWS удирдлагын консолоор дамжуулан;
  • Terraform кодын дэд бүтэц нь залхуу автомат машинуудад зориулагдсан;

Боловсруулсан системийн архитектур

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Ашигласан бүрэлдэхүүн хэсгүүд:

  • Aviasales API - энэ API-аас буцаасан өгөгдлийг дараагийн бүх ажилд ашиглах болно;
  • EC2 Үйлдвэрлэгчийн жишээ - оролтын мэдээллийн урсгалыг үүсгэх үүлэн доторх ердийн виртуал машин:
    • Kinesis агент Энэ нь Kinesis (Kinesis Data Streams эсвэл Kinesis Firehose) руу өгөгдөл цуглуулах, илгээх хялбар аргыг машин дээр суулгасан Java програм юм. Агент нь заасан сангууд дахь файлуудын багцыг байнга хянаж, Kinesis руу шинэ өгөгдлийг илгээдэг;
    • API дуудагч скрипт — API-д хүсэлт гаргаж, хариуг Kinesis Agent хянадаг хавтсанд оруулдаг Python скрипт;
  • Kinesis Data Streams - өргөн цар хүрээтэй боломж бүхий бодит цагийн өгөгдөл дамжуулах үйлчилгээ;
  • Kinesis Analytics нь сервергүй үйлчилгээ бөгөөд бодит цаг хугацаанд дамжуулж буй өгөгдөлд дүн шинжилгээ хийхийг хялбаршуулдаг. Amazon Kinesis Data Analytics нь програмын нөөцийг тохируулж, ирж буй өгөгдлийн хэмжээг автоматаар зохицуулдаг;
  • AWS Lambda — серверүүдийг нөөцлөх, тохируулахгүйгээр код ажиллуулах боломжийг олгодог үйлчилгээ. Бүх тооцоолох хүчийг дуудлага бүрт автоматаар хэмждэг;
  • Amazon DynamoDB - Ямар ч масштабтай ажиллах үед 10 миллисекундээс бага хоцролтыг хангадаг түлхүүр-утга хосын мэдээллийн сан, баримт бичиг. DynamoDB ашиглах үед та ямар нэгэн серверийг хангах, засварлах, удирдах шаардлагагүй. DynamoDB нь боломжтой нөөцийн хэмжээг тохируулах, өндөр гүйцэтгэлийг хадгалахын тулд хүснэгтүүдийг автоматаар масштабладаг. Системийн удирдлага шаардлагагүй;
  • Amazon SNS - нийтлэгч-захиалагч (Pub/Sub) загварыг ашиглан мессеж илгээх бүрэн удирдлагатай үйлчилгээ бөгөөд үүний тусламжтайгаар та микро үйлчилгээ, түгээсэн систем болон сервергүй програмуудыг тусгаарлах боломжтой. SNS нь гар утасны мэдэгдэл, SMS мессеж, имэйлээр дамжуулан эцсийн хэрэглэгчдэд мэдээлэл илгээхэд ашиглагдана.

Анхны сургалт

Өгөгдлийн урсгалыг дуурайхын тулд би Aviasales API-аас буцаасан онгоцны тасалбарын мэдээллийг ашиглахаар шийдсэн. IN баримт бичиг Өөр өөр аргуудын нэлээд өргөн жагсаалтыг авч үзье, тэдгээрийн аль нэгийг нь авч үзье - "Сар бүрийн үнийн хуанли" нь шилжүүлгийн тоогоор бүлэглэсэн сарын өдөр бүрийн үнийг харуулна. Хэрэв та хүсэлтэд хайлтын сарыг заагаагүй бол одоогийнхоос хойшхи сарын мэдээллийг буцааж өгөх болно.

За ингээд бүртгүүлээд токеноо авцгаая.

Жишээ хүсэлтийг доор харуулав.

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Хүсэлтэд токен зааж өгснөөр API-аас өгөгдөл хүлээн авах дээрх арга ажиллах боловч би хандалтын токеныг толгойгоор дамжуулахыг илүүд үздэг тул бид энэ аргыг api_caller.py скрипт дээр ашиглах болно.

Хариултын жишээ:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Дээрх жишээ API хариулт нь Санкт-Петербургээс Пук хүрэх тасалбарыг харуулж байна ... Өө, ямар мөрөөдөл вэ ...
Би Казань, Пхукет одоо "зөвхөн мөрөөдөл" болсон тул Санкт-Петербургээс Казань хүрэх тасалбар хайцгаая.

Энэ нь таныг аль хэдийн AWS бүртгэлтэй гэж үздэг. Kinesis болон SMS-ээр мэдэгдэл илгээх нь жилийн төлбөрт ороогүй гэдгийг би нэн даруй онцгойлон хэлмээр байна. Үнэгүй шат (үнэгүй хэрэглээ). Гэсэн хэдий ч, хэдэн доллар бодож, санал болгож буй системийг барьж, түүгээр тоглох бүрэн боломжтой. Мэдээжийн хэрэг, шаардлагагүй болсоны дараа бүх нөөцийг устгахаа бүү мартаарай.

Аз болоход, хэрэв бид сар бүрийн үнэ төлбөргүй хязгаарыг хангавал DynamoDb болон lambda функцууд бидэнд үнэ төлбөргүй байх болно. Жишээлбэл, DynamoDB-ийн хувьд: 25 ГБ санах ой, 25 WCU/RCU, 100 сая асуулга. Мөн сард нэг сая lambda функцийн дуудлага.

Гараар системийг байршуулах

Kinesis Data Streams-ийг тохируулж байна

Kinesis Data Streams үйлчилгээ рүү орж, тус бүрдээ нэг хэлтэрхий гэсэн хоёр шинэ урсгал үүсгэцгээе.

Хагархай гэж юу вэ?
Shard нь Amazon Kinesis урсгалын өгөгдөл дамжуулах үндсэн нэгж юм. Нэг сегмент нь 1 МБ/с хурдтай оролтын өгөгдөл дамжуулах, 2 МБ/с хурдтай гаралтын өгөгдөл дамжуулах боломжийг олгодог. Нэг сегмент нь секундэд 1000 PUT оруулгыг дэмждэг. Өгөгдлийн урсгал үүсгэх үед шаардлагатай тооны сегментийг зааж өгөх хэрэгтэй. Жишээлбэл, та хоёр сегмент бүхий өгөгдлийн урсгал үүсгэж болно. Энэ өгөгдлийн урсгал нь секундэд 2 хүртэлх PUT бичлэгийг дэмждэг 4 МБ/с хурдтай оролтын өгөгдөл дамжуулах ба 2000 МБ/с гаралттай өгөгдөл дамжуулах боломжийг олгоно.

Таны дамжуулалтад хэр олон хэлтэрхий байна, дамжуулах чадвар төдий чинээ их болно. Зарчмын хувьд, урсгалыг ингэж масштабладаг - хэлтэрхий нэмэх замаар. Гэхдээ илүү олон хэлтэрхийтэй байх тусам үнэ нь өндөр болно. Хэсэг бүр нь цагт 1,5 цент, сая PUT ачааны нэгж тутамд 1.4 цент нэмж төлдөг.

Нэртэй шинэ урсгал үүсгэцгээе онгоцны тийз, 1 хэлтэрхий түүнд хангалттай байх болно:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Одоо нэрээр өөр thread үүсгэцгээе тусгай_урсгал:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

Үйлдвэрлэгчийн тохиргоо

Даалгаврыг шинжлэхийн тулд ердийн EC2 жишээг өгөгдөл үйлдвэрлэгч болгон ашиглахад хангалттай. Энэ нь хүчирхэг, үнэтэй виртуал машин байх албагүй; spot t2.micro нь зүгээр л ажиллах болно.

Анхаарах зүйл: Жишээлбэл, та зураг ашиглах хэрэгтэй - Amazon Linux AMI 2018.03.0, Kinesis Agent-ийг хурдан эхлүүлэх тохиргоо багатай.

EC2 үйлчилгээ рүү очиж, шинэ виртуал машин үүсгэж, үнэгүй давхаргад багтсан t2.micro төрлийн хүссэн AMI-г сонго.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Шинээр үүсгэсэн виртуал машин нь Kinesis үйлчилгээтэй харилцах боломжтой байхын тулд түүнд эрх өгөх ёстой. Үүнийг хийх хамгийн сайн арга бол IAM-ын үүргийг томилох явдал юм. Тиймээс, 3-р алхам: Instance-ийн дэлгэрэнгүй мэдээллийг тохируулах дэлгэц дээр та сонгох хэрэгтэй IAM-ийн шинэ үүрэг үүсгэнэ үү:

EC2-д IAM үүргийг бий болгож байна
Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Нээгдсэн цонхон дээр бид EC2-д шинэ үүрэг үүсгэж байгааг сонгоод Зөвшөөрлийн хэсэг рүү очно уу.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Сургалтын жишээг ашигласнаар бид нөөцийн эрхийн нарийн тохиргооны бүх нарийн ширийнийг судлах шаардлагагүй тул Amazon-аас урьдчилан тохируулсан бодлогыг сонгох болно: AmazonKinesisFullAccess болон CloudWatchFullAccess.

Энэ дүрд утга учиртай нэр өгье, жишээлбэл: EC2-KinesisStreams-FullAccess. Үр дүн нь доорх зурагт үзүүлсэнтэй ижил байх ёстой.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Энэ шинэ дүрийг үүсгэсний дараа үүнийг үүсгэсэн виртуал машины жишээнд хавсаргахаа бүү мартаарай:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Бид энэ дэлгэц дээр өөр юу ч өөрчлөхгүй бөгөөд дараагийн цонх руу шилжинэ.

Хатуу дискний тохиргоог өгөгдмөл байдлаар орхиж болно, мөн шошго (хэдийгээр шошгыг ашиглах нь сайн арга боловч ядаж жишээнд нэр өгч, орчныг зааж өгнө).

Одоо бид 6-р алхам: Аюулгүй байдлын бүлгийг тохируулах таб дээр байгаа бөгөөд энд та шинээр үүсгэх эсвэл одоо байгаа Аюулгүй байдлын бүлгээ зааж өгөх шаардлагатай бөгөөд энэ нь танд ssh (порт 22)-ээр дамжуулан инстант руу холбогдох боломжийг олгодог. Эндээс Source -> My IP-г сонгоод та жишээг ажиллуулж болно.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Энэ нь ажиллаж байгаа төлөвт шилжсэн даруйд та ssh-ээр холбогдохыг оролдож болно.

Kinesis Agent-тай ажиллахын тулд машинд амжилттай холбогдсоны дараа та терминалд дараах тушаалуудыг оруулах ёстой.

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

API хариултуудыг хадгалах хавтас үүсгэцгээе:

sudo mkdir /var/log/airline_tickets

Агентийг эхлүүлэхийн өмнө та түүний тохиргоог тохируулах хэрэгтэй:

sudo vim /etc/aws-kinesis/agent.json

agent.json файлын агуулга дараах байдлаар харагдах ёстой.

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Тохируулгын файлаас харахад агент нь /var/log/airline_tickets/ лавлах дахь .log өргөтгөлтэй файлуудыг хянаж, задлан шинжилж, агаарын тээврийн_тикетийн урсгал руу шилжүүлэх болно.

Бид үйлчилгээг дахин эхлүүлж, ажиллаж байгаа эсэхийг шалгана:

sudo service aws-kinesis-agent restart

Одоо API-аас өгөгдөл хүсэх Python скриптийг татаж авцгаая:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py скрипт нь Aviasales-ээс өгөгдөл хүсэх ба хүлээн авсан хариултыг Kinesis агент сканнердсан лавлахад хадгалдаг. Энэхүү скриптийн хэрэгжилт нь нэлээд стандарт бөгөөд TicketsApi анги байдаг бөгөөд энэ нь API-г асинхроноор татах боломжийг олгодог. Бид токен бүхий толгой хэсгийг дамжуулж, энэ ангид параметрүүдийг хүсэх болно:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Агентын зөв тохиргоо болон ажиллагааг шалгахын тулд api_caller.py скриптийг ажиллуулж үзье:

sudo ./api_caller.py TOKEN

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Мөн бид Агентын бүртгэлүүд болон агаарын тээврийн_тилетийн мэдээллийн урсгалын Хяналтын таб дээрээс ажлын үр дүнг харна.

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Таны харж байгаагаар бүх зүйл ажиллаж, Kinesis Agent дамжуулалт руу өгөгдлийг амжилттай илгээдэг. Одоо хэрэглэгчийн тохиргоог хийцгээе.

Kinesis Data Analytics-ийг тохируулж байна

Бүхэл системийн гол бүрэлдэхүүн хэсэг рүү шилжье - Kinesis Data Analytics дээр kinesis_analytics_airlines_app нэртэй шинэ програм үүсгэнэ үү:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Kinesis Data Analytics нь SQL хэлийг ашиглан Kinesis Streams-аас бодит цагийн өгөгдлийн анализ хийх боломжийг танд олгоно. Энэ нь бүрэн автомат масштабтай үйлчилгээ юм (Kinesis Streams-ээс ялгаатай):

  1. эх өгөгдлийн хүсэлт дээр үндэслэн шинэ урсгал (Output Stream) үүсгэх боломжийг танд олгоно;
  2. програмууд ажиллаж байх үед гарсан алдаатай урсгалыг өгдөг (Error Stream);
  3. оролтын өгөгдлийн схемийг автоматаар тодорхойлох боломжтой (шаардлагатай бол гараар дахин тодорхойлж болно).

Энэ бол хямд үйлчилгээ биш - нэг цагийн ажил нь 0.11 ам. доллар тул та үүнийг анхааралтай ашиглаж, дуусгасны дараа устгах хэрэгтэй.

Програмыг өгөгдлийн эх сурвалжтай холбоно уу:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Бидний холбогдох урсгалыг сонгоно уу (нисэх онгоцны тийз):

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Дараа нь та шинэ IAM үүргийг хавсаргах хэрэгтэй бөгөөд ингэснээр програм нь урсгалаас уншиж, урсгал руу бичих боломжтой болно. Үүнийг хийхийн тулд Хандалтын зөвшөөрлийн блок дээр юу ч өөрчлөхгүй байх нь хангалттай.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Одоо урсгал дахь өгөгдлийн схемийг илрүүлэх хүсэлт гаргацгаая; үүнийг хийхийн тулд "Схемийг илрүүлэх" товчийг дарна уу. Үүний үр дүнд IAM-ийн үүрэг шинэчлэгдэж (шинэ нэгийг үүсгэх болно) бөгөөд дамжуулалтад аль хэдийн ирсэн өгөгдлөөс схем илрүүлэлтийг эхлүүлэх болно:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Одоо та SQL засварлагч руу очих хэрэгтэй. Та энэ товчлуур дээр дарахад програмыг эхлүүлэхийг хүссэн цонх гарч ирнэ - эхлүүлэхийг хүсч буй зүйлээ сонгоно уу:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Дараах энгийн асуулгыг SQL засварлагчийн цонхонд оруулаад Save and Run SQL дээр дарна уу:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Харилцан хамаарлын өгөгдлийн санд та INSERT хэллэгийг ашиглан бүртгэл, асуулгад SELECT мэдэгдлийг ашиглан хүснэгтүүдтэй ажилладаг. Amazon Kinesis Data Analytics-д та урсгал (STREAMs) болон шахуургууд (PUMPs)-тай ажилладаг бөгөөд энэ нь програмын нэг урсгалаас өөр урсгал руу өгөгдөл оруулах тасралтгүй хүсэлтүүд юм.

Дээр үзүүлсэн SQL асуулга нь таван мянган рублиас бага үнэтэй Aeroflot тасалбарыг хайж байна. Эдгээр нөхцөлийг хангасан бүх бичлэгийг DESTINATION_SQL_STREAM урсгалд байрлуулах болно.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Destination блок дотроос тусгай_урсгалт урсгалыг сонгоод, програм доторх урсгалын нэр DESTINATION_SQL_STREAM унадаг жагсаалтаас:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Бүх заль мэхийн үр дүн нь доорх зурагтай төстэй байх ёстой.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

SNS сэдэв үүсгэх, бүртгүүлэх

Энгийн мэдэгдлийн үйлчилгээ рүү очоод тэнд Airlines нэртэй шинэ сэдэв үүсгэнэ үү.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Энэ сэдэвт бүртгүүлж, SMS мэдэгдэл илгээх гар утасны дугаарыг зааж өгнө үү.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

DynamoDB дээр хүснэгт үүсгэ

Тэдний агаарын тээврийн_тилетийн урсгалын түүхий өгөгдлийг хадгалахын тулд DynamoDB дээр ижил нэртэй хүснэгт үүсгэцгээе. Бид record_id-г үндсэн түлхүүр болгон ашиглах болно:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

Ламбда функц цуглуулагчийг бий болгох

Цуглуулагч нэртэй ламбда функцийг үүсгэцгээе, түүний даалгавар нь агаарын тээврийн тасалбарын урсгалд санал асуулга хийх бөгөөд хэрэв тэнд шинэ бичлэг олдвол DynamoDB хүснэгтэд эдгээр бичлэгүүдийг оруулах болно. Мэдээжийн хэрэг, анхдагч эрхээс гадна энэ lambda нь Kinesis өгөгдлийн урсгал руу унших, DynamoDB руу бичих эрхтэй байх ёстой.

Коллекторын lambda функцэд IAM үүргийг бий болгож байна
Эхлээд Lambda-TicketsProcessingRole нэртэй lambda-д зориулж IAM-ийн шинэ үүргийг үүсгэцгээе:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Туршилтын жишээний хувьд урьдчилан тохируулсан AmazonKinesisReadOnlyAccess болон AmazonDynamoDBFullAccess бодлого нь доорх зурагт үзүүлсэн шиг маш тохиромжтой.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

Airline_stream-д шинэ оруулгууд орох үед энэ lambda-г Kinesis-ийн гохоор эхлүүлэх ёстой тул бид шинэ гох нэмэх шаардлагатай:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Үлдсэн зүйл бол кодыг оруулаад lambda-г хадгалах явдал юм.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda функцийн мэдэгдэгч үүсгэх

Хоёрдахь урсгалыг (special_stream) хянаж, SNS руу мэдэгдэл илгээх хоёр дахь lambda функц нь ижил төстэй байдлаар бүтээгдсэн. Тиймээс, энэ ламбда нь Kinesis-ээс уншиж, өгөгдсөн SNS сэдэв рүү мессеж илгээх эрхтэй байх ёстой бөгөөд дараа нь SNS үйлчилгээ нь энэ сэдвийн бүх захиалагчдад (имэйл, SMS гэх мэт) илгээгдэх болно.

IAM үүргийг бий болгож байна
Эхлээд бид энэ ламбда-д зориулж IAM-ийн Lambda-KinesisAlarm үүргийг үүсгээд дараа нь үүсгэж буй alarm_notifier lambda-д энэ үүргийг онооно.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

Энэ lambda нь тусгай_урсгал руу шинэ бичлэг оруулах гох дээр ажиллах ёстой тул та Цуглуулагч ламбдатай адил гохыг тохируулах хэрэгтэй.

Энэ lambda-г тохируулахад хялбар болгохын тулд агаарын тээврийн сэдвийн ANR (Amazon Recourse Names)-ийг байрлуулдаг TOPIC_ARN орчны шинэ хувьсагчийг танилцуулъя:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Ламбда кодыг оруулна уу, энэ нь тийм ч төвөгтэй биш юм:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Энд л гарын авлагын системийн тохиргоо дууссан бололтой. Үлдсэн зүйл бол бид бүх зүйлийг зөв тохируулсан эсэхийг шалгах явдал юм.

Terraform кодоос байршуулах

Шаардлагатай бэлтгэл

Терраформ кодоос дэд бүтцийг байрлуулахад маш тохиромжтой нээлттэй эхийн хэрэгсэл юм. Энэ нь сурахад хялбар өөрийн гэсэн синтакстай бөгөөд хэрхэн, юуг ашиглах талаар олон жишээтэй. Atom засварлагч эсвэл Visual Studio Code нь Terraform-тэй ажиллахад хялбар болгодог олон ашигтай залгаасуудтай.

Та түгээлтийг татаж авах боломжтой Эндээс. Terraform-ийн бүх чадамжийн нарийвчилсан дүн шинжилгээ нь энэ өгүүллийн хамрах хүрээнээс гадуур тул бид гол зүйлээр хязгаарлагдах болно.

Яаж эхлэх вэ

Төслийн бүрэн код нь миний хадгалах санд. Бид репозиторыг өөрсөддөө хувилдаг. Эхлэхээсээ өмнө та AWS CLI суулгаж, тохируулсан эсэхээ шалгах хэрэгтэй, учир нь... Terraform нь ~/.aws/credentials файлаас итгэмжлэл хайх болно.

Terraform одоогоор үүлэн дээр бидэнд юу бүтээж байгааг харахын тулд дэд бүтцийг бүхэлд нь байрлуулахын өмнө төлөвлөгөөний командыг ажиллуулах нь сайн туршлага юм.

terraform.exe plan

Мэдэгдэл илгээх утасны дугаараа оруулахыг танаас хүсэх болно. Энэ үе шатанд үүнийг оруулах шаардлагагүй.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Хөтөлбөрийн үйл ажиллагааны төлөвлөгөөнд дүн шинжилгээ хийсний дараа бид нөөцийг үүсгэж эхэлж болно.

terraform.exe apply

Энэ командыг илгээсний дараа танаас дахин утасны дугаар оруулахыг хүсэх бөгөөд үйлдлүүдийг бодитоор гүйцэтгэх тухай асуулт гарч ирэхэд "тийм" гэж залгана уу. Энэ нь бүхэл бүтэн дэд бүтцийг бий болгох, EC2-ийн шаардлагатай бүх тохиргоог хийх, lambda функцуудыг байрлуулах гэх мэт боломжийг олгоно.

Terraform кодоор дамжуулан бүх нөөцийг амжилттай үүсгэсний дараа та Kinesis Analytics програмын дэлгэрэнгүй мэдээллийг үзэх хэрэгтэй (харамсалтай нь би үүнийг кодоос шууд хэрхэн хийхийг олж чадаагүй).

Програмыг ажиллуулна уу:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Үүний дараа та унадаг жагсаалтаас сонгон програм доторх урсгалын нэрийг тодорхой тохируулах ёстой:

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Одоо бүх зүйл явахад бэлэн боллоо.

Програмыг туршиж байна

Та системийг гараар эсвэл Terraform кодоор хэрхэн байршуулснаас үл хамааран энэ нь адилхан ажиллах болно.

Бид Kinesis Agent суулгасан EC2 виртуал машин руу SSH-ээр нэвтэрч, api_caller.py скриптийг ажиллуулдаг.

sudo ./api_caller.py TOKEN

Та өөрийн дугаар руу SMS ирэхийг хүлээх л үлдлээ.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
SMS - бараг 1 минутын дотор таны утсанд мессеж ирдэг.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал
Бичлэгүүдийг DynamoDB мэдээллийн санд хадгалсан эсэхийг дараагийн, илүү нарийвчилсан дүн шинжилгээ хийхээр харах л үлдлээ. Онгоцны тийзний хүснэгт нь ойролцоогоор дараах өгөгдлийг агуулна.

Aviasales API-г Amazon Kinesis-тэй нэгтгэх, сервергүй хялбар байдал

дүгнэлт

Хийсэн ажлын явцад Amazon Kinesis дээр суурилсан онлайн өгөгдөл боловсруулах системийг бүтээсэн. Kinesis Agent-ийг Kinesis Data Streams болон SQL командыг ашиглан бодит цагийн аналитик Kinesis Analytics-тэй хамт ашиглах сонголтууд, мөн Amazon Kinesis-ийн бусад AWS үйлчилгээнүүдтэй харилцах талаар авч үзсэн.

Бид дээрх системийг хоёр аргаар суулгасан: нэлээд урт гарын авлага, Terraform кодоос хурдан.

Төслийн бүх эх код бэлэн байна миний GitHub репозитор дээр, Би танд үүнтэй танилцахыг санал болгож байна.

Би нийтлэлийг хэлэлцэхдээ баяртай байна, би таны сэтгэгдлийг тэсэн ядан хүлээж байна. Би бүтээлч шүүмжлэлд найдаж байна.

Чамд амжилт хүсье!

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх