การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

เฮ้ ฮับ!

คุณชอบบินเครื่องบินไหม? ฉันชอบมัน แต่ระหว่างการแยกตัวเอง ฉันก็หลงรักการวิเคราะห์ข้อมูลเกี่ยวกับตั๋วเครื่องบินจากแหล่งข้อมูลชื่อดังแห่งหนึ่ง - Aviasales

วันนี้เราจะวิเคราะห์การทำงานของ Amazon Kinesis, สร้างระบบสตรีมมิ่งด้วยการวิเคราะห์แบบเรียลไทม์, ติดตั้งฐานข้อมูล Amazon DynamoDB NoSQL เป็นที่จัดเก็บข้อมูลหลัก และตั้งค่าการแจ้งเตือนทาง SMS สำหรับตั๋วที่น่าสนใจ

รายละเอียดทั้งหมดอยู่ภายใต้การตัด! ไป!

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

การแนะนำ

ตัวอย่างเช่น เราจำเป็นต้องเข้าถึง Aviasales API. เข้าถึงได้โดยไม่มีค่าใช้จ่ายและไม่มีข้อจำกัด คุณเพียงแค่ต้องลงทะเบียนในส่วน "นักพัฒนา" เพื่อรับโทเค็น API ของคุณเพื่อเข้าถึงข้อมูล

วัตถุประสงค์หลักของบทความนี้คือเพื่อให้ความเข้าใจทั่วไปเกี่ยวกับการใช้การสตรีมข้อมูลใน AWS เราคำนึงว่าข้อมูลที่ส่งคืนโดย API ที่ใช้นั้นไม่ทันสมัยอย่างเคร่งครัดและถูกส่งจากแคชซึ่งก็คือ สร้างขึ้นจากการค้นหาโดยผู้ใช้เว็บไซต์ Aviasales.ru และ Jetradar.com ในช่วง 48 ชั่วโมงที่ผ่านมา

Kinesis-agent ที่ติดตั้งบนเครื่องที่ผลิต ซึ่งได้รับผ่าน API จะแยกวิเคราะห์และส่งข้อมูลไปยังสตรีมที่ต้องการโดยอัตโนมัติผ่าน Kinesis Data Analytics เวอร์ชัน Raw ของสตรีมนี้จะถูกเขียนไปยังร้านค้าโดยตรง พื้นที่จัดเก็บข้อมูลดิบที่ใช้งานใน DynamoDB จะช่วยให้วิเคราะห์ตั๋วได้ลึกยิ่งขึ้นผ่านเครื่องมือ BI เช่น AWS Quick Sight

เราจะพิจารณาสองทางเลือกสำหรับการปรับใช้โครงสร้างพื้นฐานทั้งหมด:

  • คู่มือ - ผ่าน AWS Management Console;
  • โครงสร้างพื้นฐานจากโค้ด Terraform นั้นมีไว้สำหรับผู้สร้างอัตโนมัติที่ขี้เกียจ

สถาปัตยกรรมของระบบที่พัฒนาแล้ว

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ส่วนประกอบที่ใช้:

  • Aviasales API — ข้อมูลที่ส่งคืนโดย API นี้จะถูกนำมาใช้สำหรับงานต่อๆ ไปทั้งหมด
  • อินสแตนซ์ผู้ผลิต EC2 — เครื่องเสมือนปกติในระบบคลาวด์ที่จะสร้างสตรีมข้อมูลอินพุต:
    • ตัวแทน Kinesis เป็นแอปพลิเคชัน Java ที่ติดตั้งในเครื่องซึ่งมอบวิธีง่ายๆ ในการรวบรวมและส่งข้อมูลไปยัง Kinesis (Kinesis Data Streams หรือ Kinesis Firehose) เอเจนต์จะตรวจสอบชุดของไฟล์ในไดเร็กทอรีที่ระบุอย่างต่อเนื่องและส่งข้อมูลใหม่ไปยัง Kinesis
    • สคริปต์ผู้เรียก API — สคริปต์ Python ที่สร้างคำขอไปยัง API และใส่การตอบกลับลงในโฟลเดอร์ที่ Kinesis Agent ตรวจสอบ
  • สตรีมข้อมูล Kinesis — บริการสตรีมข้อมูลแบบเรียลไทม์พร้อมความสามารถในการขยายขนาดที่กว้าง
  • การวิเคราะห์ไคเนซิส เป็นบริการแบบไร้เซิร์ฟเวอร์ที่ทำให้การวิเคราะห์ข้อมูลสตรีมมิ่งแบบเรียลไทม์ง่ายขึ้น Amazon Kinesis Data Analytics กำหนดค่าทรัพยากรแอปพลิเคชันและปรับขนาดโดยอัตโนมัติเพื่อรองรับปริมาณข้อมูลขาเข้า
  • AWS แลมบ์ดา — บริการที่ให้คุณรันโค้ดโดยไม่ต้องสำรองข้อมูลหรือตั้งค่าเซิร์ฟเวอร์ พลังการประมวลผลทั้งหมดจะถูกปรับขนาดโดยอัตโนมัติสำหรับการโทรแต่ละครั้ง
  • อเมซอน ไดนาโมดีบี - ฐานข้อมูลของคู่คีย์-ค่าและเอกสารที่ให้เวลาแฝงน้อยกว่า 10 มิลลิวินาทีเมื่อทำงานในทุกขนาด เมื่อใช้ DynamoDB คุณไม่จำเป็นต้องจัดเตรียม แพตช์ หรือจัดการเซิร์ฟเวอร์ใดๆ DynamoDB ปรับขนาดตารางโดยอัตโนมัติเพื่อปรับจำนวนทรัพยากรที่มีอยู่และรักษาประสิทธิภาพสูง ไม่จำเป็นต้องมีการดูแลระบบ
  • อเมซอน SNS - บริการที่มีการจัดการเต็มรูปแบบสำหรับการส่งข้อความโดยใช้โมเดลผู้เผยแพร่-สมาชิก (Pub/Sub) ซึ่งคุณสามารถแยกไมโครเซอร์วิส ระบบแบบกระจาย และแอปพลิเคชันแบบไร้เซิร์ฟเวอร์ได้ SNS สามารถใช้เพื่อส่งข้อมูลไปยังผู้ใช้ผ่านการแจ้งเตือนแบบพุชบนมือถือ ข้อความ SMS และอีเมล

การฝึกอบรมเบื้องต้น

เพื่อจำลองกระแสข้อมูล ฉันตัดสินใจใช้ข้อมูลตั๋วเครื่องบินที่ส่งคืนโดย Aviasales API ใน เอกสาร รายการวิธีการที่แตกต่างกันค่อนข้างมาก มาดูวิธีใดวิธีหนึ่งกัน - “ปฏิทินราคารายเดือน” ซึ่งส่งคืนราคาในแต่ละวันของเดือน โดยจัดกลุ่มตามจำนวนการโอน หากคุณไม่ระบุเดือนการค้นหาในคำขอ ข้อมูลจะถูกส่งกลับสำหรับเดือนถัดจากเดือนปัจจุบัน

ดังนั้น มาลงทะเบียนและรับโทเค็นของเรากันดีกว่า

คำขอตัวอย่างอยู่ด้านล่าง:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

วิธีการรับข้อมูลจาก API ข้างต้นโดยการระบุโทเค็นในคำขอนั้นใช้ได้ แต่ฉันต้องการส่งโทเค็นการเข้าถึงผ่านส่วนหัว ดังนั้นเราจะใช้วิธีนี้ในสคริปต์ api_caller.py

ตัวอย่างคำตอบ:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

ตัวอย่างการตอบสนองของ API ด้านบนแสดงตั๋วจากเซนต์ปีเตอร์สเบิร์กถึงฟุก... โอ้ ช่างเป็นความฝัน...
เนื่องจากฉันมาจากคาซาน และภูเก็ตตอนนี้เป็น "เพียงความฝัน" เรามาดูตั๋วจากเซนต์ปีเตอร์สเบิร์กไปคาซานกันดีกว่า

จะถือว่าคุณมีบัญชี AWS อยู่แล้ว ฉันอยากจะแจ้งให้ทราบเป็นพิเศษทันทีว่า Kinesis และการส่งการแจ้งเตือนผ่าน SMS ไม่รวมอยู่ในรายปี ฟรีเทียร์ (ใช้งานฟรี). แต่ถึงแม้จะมีเงินสองสามดอลลาร์อยู่ในใจ แต่ก็ค่อนข้างเป็นไปได้ที่จะสร้างระบบที่เสนอและเล่นกับมัน และแน่นอนว่าอย่าลืมลบทรัพยากรทั้งหมดหลังจากที่ไม่จำเป็นต้องใช้อีกต่อไป

โชคดีที่ฟังก์ชัน DynamoDb และ lambda จะให้บริการฟรีหากเราใช้จ่ายถึงขีดจำกัดการใช้งานฟรีรายเดือนของเรา ตัวอย่างเช่น สำหรับ DynamoDB: พื้นที่จัดเก็บ 25 GB, 25 WCU/RCU และ 100 ล้านข้อความค้นหา และการเรียกใช้ฟังก์ชัน lambda ล้านครั้งต่อเดือน

การปรับใช้ระบบด้วยตนเอง

การตั้งค่า Kinesis Data Stream

ไปที่บริการ Kinesis Data Streams และสร้างสตรีมใหม่สองสตรีม โดยแต่ละสตรีมมีหนึ่งส่วน

เศษคืออะไร?
ชาร์ดคือหน่วยถ่ายโอนข้อมูลพื้นฐานของสตรีม Amazon Kinesis ส่วนหนึ่งมีการถ่ายโอนข้อมูลอินพุตที่ความเร็ว 1 MB/s และการถ่ายโอนข้อมูลเอาท์พุตที่ความเร็ว 2 MB/s หนึ่งเซ็กเมนต์รองรับรายการ PUT สูงสุด 1000 รายการต่อวินาที เมื่อสร้างสตรีมข้อมูล คุณต้องระบุจำนวนกลุ่มที่ต้องการ ตัวอย่างเช่น คุณสามารถสร้างสตรีมข้อมูลที่มีสองส่วนได้ สตรีมข้อมูลนี้จะให้การถ่ายโอนข้อมูลอินพุตที่ 2 MB/s และการถ่ายโอนข้อมูลเอาท์พุตที่ 4 MB/s ซึ่งรองรับบันทึก PUT สูงสุด 2000 รายการต่อวินาที

ยิ่งมีส่วนแบ่งข้อมูลในสตรีมของคุณมากเท่าใด ปริมาณงานก็จะยิ่งมากขึ้นเท่านั้น โดยหลักการแล้ว นี่คือวิธีการปรับขนาดโฟลว์ - โดยการเพิ่มส่วนย่อย แต่ยิ่งคุณมีเศษมากเท่าไร ราคาก็ยิ่งสูงขึ้นเท่านั้น แต่ละชาร์ดมีราคา 1,5 เซนต์ต่อชั่วโมง และเพิ่มอีก 1.4 เซนต์สำหรับเพย์โหลด PUT ทุก ๆ ล้านหน่วย

มาสร้างกระแสใหม่ด้วยชื่อกันเถอะ ตั๋วเครื่องบิน1 ชิ้นก็เพียงพอสำหรับเขา:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ตอนนี้เรามาสร้างหัวข้อใหม่โดยใช้ชื่อกัน พิเศษ_สตรีม:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

การตั้งค่าผู้ผลิต

ในการวิเคราะห์งาน การใช้ EC2 instance ทั่วไปเป็นผู้ผลิตข้อมูลก็เพียงพอแล้ว ไม่จำเป็นต้องเป็นเครื่องเสมือนที่ทรงพลังและมีราคาแพง สปอต t2.micro ก็ทำได้ดี

หมายเหตุสำคัญ: ตัวอย่างเช่น คุณควรใช้อิมเมจ - Amazon Linux AMI 2018.03.0 โดยมีการตั้งค่าน้อยกว่าสำหรับการเปิดใช้ Kinesis Agent อย่างรวดเร็ว

ไปที่บริการ EC2 สร้างเครื่องเสมือนใหม่ เลือก AMI ที่ต้องการด้วยประเภท t2.micro ซึ่งรวมอยู่ใน Free Tier:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
เพื่อให้เครื่องเสมือนที่สร้างขึ้นใหม่สามารถโต้ตอบกับบริการ Kinesis ได้ จะต้องได้รับสิทธิ์ในการดำเนินการดังกล่าว วิธีที่ดีที่สุดในการทำเช่นนี้คือการกำหนดบทบาท IAM ดังนั้น ในจอภาพ ขั้นตอนที่ 3: กำหนดค่ารายละเอียดอินสแตนซ์ คุณควรเลือก สร้างบทบาท IAM ใหม่:

การสร้างบทบาท IAM สำหรับ EC2
การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ในหน้าต่างที่เปิดขึ้น ให้เลือกว่าเรากำลังสร้างบทบาทใหม่สำหรับ EC2 และไปที่ส่วนสิทธิ์:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
จากตัวอย่างการฝึกอบรม เราไม่จำเป็นต้องเจาะลึกรายละเอียดปลีกย่อยทั้งหมดของการกำหนดค่าสิทธิ์ทรัพยากร ดังนั้นเราจะเลือกนโยบายที่ Amazon กำหนดค่าไว้ล่วงหน้า: AmazonKinesisFullAccess และ CloudWatchFullAccess

เรามาตั้งชื่อที่มีความหมายสำหรับบทบาทนี้กัน เช่น: EC2-KinesisStreams-FullAccess ผลลัพธ์ควรจะเหมือนกับที่แสดงในภาพด้านล่าง:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
หลังจากสร้างบทบาทใหม่นี้แล้ว อย่าลืมแนบบทบาทเข้ากับอินสแตนซ์เครื่องเสมือนที่สร้างขึ้น:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
เราไม่เปลี่ยนแปลงสิ่งอื่นใดบนหน้าจอนี้และไปยังหน้าต่างถัดไป

คุณสามารถปล่อยให้การตั้งค่าฮาร์ดไดรฟ์เป็นค่าเริ่มต้นได้ เช่นเดียวกับแท็ก (แม้ว่าจะเป็นวิธีปฏิบัติที่ดีที่จะใช้แท็ก แต่อย่างน้อยก็ตั้งชื่ออินสแตนซ์และระบุสภาพแวดล้อม)

ตอนนี้เราอยู่ในขั้นตอนที่ 6: แท็บกำหนดค่ากลุ่มความปลอดภัย ซึ่งคุณจะต้องสร้างใหม่หรือระบุกลุ่มความปลอดภัยที่มีอยู่ ซึ่งช่วยให้คุณเชื่อมต่อผ่าน ssh (พอร์ต 22) ไปยังอินสแตนซ์ได้ เลือกแหล่งที่มา -> IP ของฉันที่นั่น และคุณสามารถเปิดใช้งานอินสแตนซ์ได้

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ทันทีที่เปลี่ยนเป็นสถานะการทำงาน คุณสามารถลองเชื่อมต่อผ่าน ssh ได้

เพื่อให้สามารถทำงานร่วมกับ Kinesis Agent ได้ หลังจากเชื่อมต่อกับเครื่องสำเร็จแล้ว คุณต้องป้อนคำสั่งต่อไปนี้ในเทอร์มินัล:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

มาสร้างโฟลเดอร์เพื่อบันทึกการตอบกลับ API:

sudo mkdir /var/log/airline_tickets

ก่อนที่จะเริ่มเอเจนต์ คุณต้องกำหนดค่าการกำหนดค่า:

sudo vim /etc/aws-kinesis/agent.json

เนื้อหาของไฟล์ agent.json ควรมีลักษณะดังนี้:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

ดังที่เห็นได้จากไฟล์การกำหนดค่า เอเจนต์จะตรวจสอบไฟล์ที่มีนามสกุล .log ในไดเร็กทอรี /var/log/airline_tickets/ จากนั้นแยกวิเคราะห์และถ่ายโอนไปยังสตรีม airline_tickets

เรารีสตาร์ทบริการและตรวจสอบให้แน่ใจว่าบริการนั้นเปิดใช้งานอยู่:

sudo service aws-kinesis-agent restart

ตอนนี้เรามาดาวน์โหลดสคริปต์ Python ที่จะขอข้อมูลจาก API กัน:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

สคริปต์ api_caller.py ร้องขอข้อมูลจาก Aviasales และบันทึกการตอบสนองที่ได้รับในไดเร็กทอรีที่ตัวแทน Kinesis สแกน การใช้งานสคริปต์นี้ค่อนข้างเป็นมาตรฐาน มีคลาส TicketsApi ซึ่งช่วยให้คุณสามารถดึง API แบบอะซิงโครนัสได้ เราส่งส่วนหัวที่มีโทเค็นและขอพารามิเตอร์ไปยังคลาสนี้:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

เพื่อทดสอบการตั้งค่าและการทำงานที่ถูกต้องของเอเจนต์ ให้ทดสอบรันสคริปต์ api_caller.py:

sudo ./api_caller.py TOKEN

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
และเราดูผลลัพธ์ของการทำงานในบันทึกของตัวแทนและในแท็บการตรวจสอบในสตรีมข้อมูล airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
อย่างที่คุณเห็น ทุกอย่างทำงานได้และ Kinesis Agent ส่งข้อมูลไปยังสตรีมได้สำเร็จ ตอนนี้เรามากำหนดค่าคอนซูเมอร์กันดีกว่า

การตั้งค่าการวิเคราะห์ข้อมูล Kinesis

มาดูส่วนประกอบส่วนกลางของทั้งระบบกัน - สร้างแอปพลิเคชันใหม่ใน Kinesis Data Analytics ชื่อ kinesis_analytics_airlines_app:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
Kinesis Data Analytics ช่วยให้คุณดำเนินการวิเคราะห์ข้อมูลแบบเรียลไทม์จาก Kinesis Streams โดยใช้ภาษา SQL เป็นบริการปรับขนาดอัตโนมัติโดยสมบูรณ์ (ไม่เหมือนกับ Kinesis Streams) ที่:

  1. ช่วยให้คุณสร้างสตรีมใหม่ (Output Stream) ตามคำขอไปยังแหล่งข้อมูล
  2. จัดเตรียมสตรีมพร้อมข้อผิดพลาดที่เกิดขึ้นในขณะที่แอปพลิเคชันกำลังทำงาน (สตรีมข้อผิดพลาด)
  3. สามารถกำหนดโครงร่างข้อมูลอินพุตได้โดยอัตโนมัติ (สามารถกำหนดใหม่ได้ด้วยตนเองหากจำเป็น)

นี่ไม่ใช่บริการราคาถูก - 0.11 USD ต่อชั่วโมงการทำงาน ดังนั้นคุณควรใช้อย่างระมัดระวังและลบออกเมื่อดำเนินการเสร็จ

มาเชื่อมต่อแอปพลิเคชันกับแหล่งข้อมูล:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
เลือกสตรีมที่เราจะเชื่อมต่อด้วย (airline_tickets):

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ถัดไป คุณต้องแนบบทบาท IAM ใหม่เพื่อให้แอปพลิเคชันสามารถอ่านจากสตรีมและเขียนไปยังสตรีมได้ ในการดำเนินการนี้ ก็ไม่เพียงพอที่จะเปลี่ยนแปลงสิ่งใดในบล็อกสิทธิ์การเข้าถึง:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ตอนนี้เรามาขอให้ค้นพบสคีมาข้อมูลในสตรีม โดยคลิกที่ปุ่ม "ค้นพบสคีมา" ด้วยเหตุนี้ บทบาท IAM จะได้รับการอัปเดต (จะมีการสร้างบทบาทใหม่) และการตรวจจับสคีมาจะเปิดตัวจากข้อมูลที่มาถึงในสตรีมแล้ว:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ตอนนี้คุณต้องไปที่ตัวแก้ไข SQL เมื่อคุณคลิกที่ปุ่มนี้ หน้าต่างจะปรากฏขึ้นเพื่อขอให้คุณเปิดแอปพลิเคชัน - เลือกสิ่งที่คุณต้องการเปิดใช้งาน:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
แทรกเคียวรีแบบง่ายต่อไปนี้ลงในหน้าต่างเอดิเตอร์ SQL และคลิก บันทึกและรัน SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

ในฐานข้อมูลเชิงสัมพันธ์ คุณทำงานกับตารางโดยใช้คำสั่ง INSERT เพื่อเพิ่มบันทึกและคำสั่ง SELECT เพื่อสืบค้นข้อมูล ใน Amazon Kinesis Data Analytics คุณจะทำงานร่วมกับสตรีม (STREAM) และปั๊ม (PUMP) ซึ่งเป็นคำขอแทรกอย่างต่อเนื่องซึ่งจะแทรกข้อมูลจากสตรีมหนึ่งไปยังแอปพลิเคชันหนึ่งไปยังสตรีมอื่น

แบบสอบถาม SQL ที่นำเสนอข้างต้นค้นหาตั๋ว Aeroflot ที่ราคาต่ำกว่าห้าพันรูเบิล บันทึกทั้งหมดที่ตรงตามเงื่อนไขเหล่านี้จะอยู่ในสตรีม DESTINATION_SQL_STREAM

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ในบล็อกปลายทาง เลือกสตรีม special_stream และในรายการดรอปดาวน์ชื่อสตรีมในแอปพลิเคชัน DESTINATION_SQL_STREAM:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ผลลัพธ์ของการปรับเปลี่ยนทั้งหมดควรมีลักษณะคล้ายกับภาพด้านล่าง:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

การสร้างและสมัครรับหัวข้อ SNS

ไปที่บริการแจ้งเตือนอย่างง่ายและสร้างหัวข้อใหม่ที่นั่นด้วยชื่อสายการบิน:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
สมัครสมาชิกหัวข้อนี้และระบุหมายเลขโทรศัพท์มือถือที่จะส่งการแจ้งเตือนทาง SMS:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

สร้างตารางใน DynamoDB

หากต้องการจัดเก็บข้อมูลดิบจากสตรีม airline_tickets เรามาสร้างตารางใน DynamoDB ด้วยชื่อเดียวกันกันดีกว่า เราจะใช้ record_id เป็นคีย์หลัก:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

การสร้างตัวรวบรวมฟังก์ชันแลมบ์ดา

มาสร้างฟังก์ชัน lambda ชื่อ Collector ซึ่งมีหน้าที่สำรวจกระแส airline_tickets และหากพบบันทึกใหม่ที่นั่น ให้แทรกบันทึกเหล่านี้ลงในตาราง DynamoDB แน่นอนว่านอกเหนือจากสิทธิ์เริ่มต้นแล้ว lambda นี้จะต้องมีสิทธิ์การอ่านสตรีมข้อมูล Kinesis และสิทธิ์การเขียน DynamoDB

การสร้างบทบาท IAM สำหรับฟังก์ชัน lambda ของตัวรวบรวม
ก่อนอื่น มาสร้างบทบาท IAM ใหม่สำหรับ lambda ชื่อ Lambda-TicketsProcessingRole:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
สำหรับตัวอย่างการทดสอบ นโยบาย AmazonKinesisReadOnlyAccess และ AmazonDynamoDBFullAccess ที่กำหนดค่าไว้ล่วงหน้านั้นค่อนข้างเหมาะสม ดังที่แสดงในภาพด้านล่าง:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

แลมบ์ดานี้ควรเปิดใช้งานโดยทริกเกอร์จาก Kinesis เมื่อมีรายการใหม่เข้าสู่ airline_stream ดังนั้นเราจึงจำเป็นต้องเพิ่มทริกเกอร์ใหม่:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
สิ่งที่เหลืออยู่คือการใส่โค้ดและบันทึกแลมบ์ดา

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

การสร้างตัวแจ้งเตือนฟังก์ชันแลมบ์ดา

ฟังก์ชันแลมบ์ดาตัวที่สองซึ่งจะตรวจสอบสตรีมที่สอง (special_stream) และส่งการแจ้งเตือนไปยัง SNS นั้นถูกสร้างขึ้นในลักษณะที่คล้ายกัน ดังนั้นแลมบ์ดานี้จะต้องมีสิทธิ์เข้าถึงเพื่ออ่านจาก Kinesis และส่งข้อความไปยังหัวข้อ SNS ที่ระบุ ซึ่งจากนั้นบริการ SNS จะถูกส่งโดยบริการ SNS ไปยังสมาชิกทั้งหมดของหัวข้อนี้ (อีเมล, SMS ฯลฯ)

การสร้างบทบาท IAM
ขั้นแรก เราสร้างบทบาท IAM Lambda-KinesisAlarm สำหรับ lambda นี้ จากนั้นกำหนดบทบาทนี้ให้กับ lambda alarm_notifier ที่ถูกสร้างขึ้น:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

lambda นี้ควรทำงานกับทริกเกอร์เพื่อให้บันทึกใหม่เข้าสู่ special_stream ดังนั้นคุณจึงต้องกำหนดค่าทริกเกอร์ในลักษณะเดียวกับที่เราทำกับ Collector lambda

เพื่อให้กำหนดค่า lambda นี้ได้ง่ายขึ้น เราขอแนะนำตัวแปรสภาพแวดล้อมใหม่ - TOPIC_ARN โดยที่เราใส่ ANR (Amazon Recourse Names) ของหัวข้อสายการบิน:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
และใส่โค้ดแลมบ์ดาลงไป มันไม่ซับซ้อนเลย:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

ดูเหมือนว่านี่คือจุดที่การกำหนดค่าระบบด้วยตนเองเสร็จสมบูรณ์ สิ่งที่เหลืออยู่คือการทดสอบและตรวจสอบให้แน่ใจว่าเราได้กำหนดค่าทุกอย่างถูกต้องแล้ว

ปรับใช้จากโค้ด Terraform

การเตรียมการที่จำเป็น

terraform เป็นเครื่องมือโอเพ่นซอร์สที่สะดวกมากสำหรับการปรับใช้โครงสร้างพื้นฐานจากโค้ด มีไวยากรณ์ของตัวเองที่ง่ายต่อการเรียนรู้และมีตัวอย่างมากมายเกี่ยวกับวิธีการและสิ่งที่จะปรับใช้ ตัวแก้ไข Atom หรือ Visual Studio Code มีปลั๊กอินที่มีประโยชน์มากมายที่ทำให้การทำงานกับ Terraform ง่ายขึ้น

คุณสามารถดาวน์โหลดการแจกจ่าย ด้วยเหตุนี้. การวิเคราะห์โดยละเอียดเกี่ยวกับความสามารถทั้งหมดของ Terraform นั้นอยู่นอกเหนือขอบเขตของบทความนี้ ดังนั้นเราจะจำกัดอยู่เพียงประเด็นหลักเท่านั้น

เริ่มอย่างไร

รหัสเต็มของโครงการคือ ในที่เก็บของฉัน. เราโคลนพื้นที่เก็บข้อมูลเพื่อตัวเราเอง ก่อนที่จะเริ่มต้น คุณต้องตรวจสอบให้แน่ใจว่าคุณได้ติดตั้งและกำหนดค่า AWS CLI แล้ว เนื่องจาก... Terraform จะค้นหาข้อมูลรับรองในไฟล์ ~/.aws/credentials

แนวปฏิบัติที่ดีคือการรันคำสั่ง plan ก่อนที่จะปรับใช้โครงสร้างพื้นฐานทั้งหมดเพื่อดูว่า Terraform กำลังสร้างอะไรให้เราในระบบคลาวด์:

terraform.exe plan

คุณจะได้รับแจ้งให้ป้อนหมายเลขโทรศัพท์เพื่อส่งการแจ้งเตือน ไม่จำเป็นต้องป้อนในขั้นตอนนี้

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
เมื่อวิเคราะห์แผนการดำเนินงานของโปรแกรมแล้ว เราก็สามารถเริ่มสร้างทรัพยากรได้:

terraform.exe apply

หลังจากส่งคำสั่งนี้คุณจะถูกขอให้ป้อนหมายเลขโทรศัพท์อีกครั้ง กด "ใช่" เมื่อมีคำถามเกี่ยวกับการดำเนินการจริงปรากฏขึ้น ซึ่งจะช่วยให้คุณสามารถตั้งค่าโครงสร้างพื้นฐานทั้งหมด ดำเนินการกำหนดค่าที่จำเป็นทั้งหมดของ EC2 ปรับใช้ฟังก์ชัน lambda ฯลฯ

หลังจากสร้างทรัพยากรทั้งหมดสำเร็จผ่านโค้ด Terraform แล้ว คุณต้องเข้าไปดูรายละเอียดของแอปพลิเคชัน Kinesis Analytics (ขออภัย ฉันไม่พบวิธีดำเนินการนี้จากโค้ดโดยตรง)

เปิดแอปพลิเคชัน:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
หลังจากนี้ คุณต้องตั้งชื่อสตรีมในแอปพลิเคชันอย่างชัดเจนโดยเลือกจากรายการแบบเลื่อนลง:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ตอนนี้ทุกอย่างก็พร้อมที่จะไป

กำลังทดสอบแอปพลิเคชัน

ไม่ว่าคุณจะปรับใช้ระบบด้วยตนเองหรือผ่านโค้ด Terraform อย่างไร ระบบก็จะทำงานเหมือนเดิม

เราเข้าสู่ระบบผ่าน SSH ไปยังเครื่องเสมือน EC2 ที่ติดตั้ง Kinesis Agent และเรียกใช้สคริปต์ api_caller.py

sudo ./api_caller.py TOKEN

สิ่งที่คุณต้องทำคือรอ SMS ไปที่หมายเลขของคุณ:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
SMS - ข้อความมาถึงโทรศัพท์ของคุณภายในเกือบ 1 นาที:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์
ต้องรอดูว่าบันทึกต่างๆ ได้รับการบันทึกในฐานข้อมูล DynamoDB เพื่อการวิเคราะห์ที่มีรายละเอียดมากขึ้นในภายหลังหรือไม่ ตาราง airline_tickets มีข้อมูลโดยประมาณต่อไปนี้:

การรวม Aviasales API กับ Amazon Kinesis และความเรียบง่ายแบบไร้เซิร์ฟเวอร์

ข้อสรุป

ในระหว่างการทำงานนี้ ระบบประมวลผลข้อมูลออนไลน์ถูกสร้างขึ้นโดยใช้ Amazon Kinesis ตัวเลือกสำหรับการใช้ Kinesis Agent ร่วมกับ Kinesis Data Streams และการวิเคราะห์แบบเรียลไทม์ Kinesis Analytics โดยใช้คำสั่ง SQL รวมถึงการโต้ตอบของ Amazon Kinesis กับบริการ AWS อื่นๆ ได้รับการพิจารณาแล้ว

เราปรับใช้ระบบข้างต้นในสองวิธี: แบบแมนนวลที่ค่อนข้างยาว และแบบด่วนจากโค้ด Terraform

มีซอร์สโค้ดโครงการทั้งหมด ในที่เก็บ GitHub ของฉันฉันขอแนะนำให้คุณทำความคุ้นเคยกับมัน

ฉันยินดีที่จะหารือเกี่ยวกับบทความนี้ ฉันหวังว่าจะแสดงความคิดเห็นของคุณ ฉันหวังว่าจะวิจารณ์อย่างสร้างสรรค์

ฉันขอให้คุณประสบความสำเร็จ!

ที่มา: will.com

เพิ่มความคิดเห็น