เฮ้ ฮับ!
คุณชอบบินเครื่องบินไหม? ฉันชอบมัน แต่ระหว่างการแยกตัวเอง ฉันก็หลงรักการวิเคราะห์ข้อมูลเกี่ยวกับตั๋วเครื่องบินจากแหล่งข้อมูลชื่อดังแห่งหนึ่ง - Aviasales
วันนี้เราจะวิเคราะห์การทำงานของ Amazon Kinesis, สร้างระบบสตรีมมิ่งด้วยการวิเคราะห์แบบเรียลไทม์, ติดตั้งฐานข้อมูล Amazon DynamoDB NoSQL เป็นที่จัดเก็บข้อมูลหลัก และตั้งค่าการแจ้งเตือนทาง SMS สำหรับตั๋วที่น่าสนใจ
รายละเอียดทั้งหมดอยู่ภายใต้การตัด! ไป!
การแนะนำ
ตัวอย่างเช่น เราจำเป็นต้องเข้าถึง
วัตถุประสงค์หลักของบทความนี้คือเพื่อให้ความเข้าใจทั่วไปเกี่ยวกับการใช้การสตรีมข้อมูลใน AWS เราคำนึงว่าข้อมูลที่ส่งคืนโดย API ที่ใช้นั้นไม่ทันสมัยอย่างเคร่งครัดและถูกส่งจากแคชซึ่งก็คือ สร้างขึ้นจากการค้นหาโดยผู้ใช้เว็บไซต์ Aviasales.ru และ Jetradar.com ในช่วง 48 ชั่วโมงที่ผ่านมา
Kinesis-agent ที่ติดตั้งบนเครื่องที่ผลิต ซึ่งได้รับผ่าน API จะแยกวิเคราะห์และส่งข้อมูลไปยังสตรีมที่ต้องการโดยอัตโนมัติผ่าน Kinesis Data Analytics เวอร์ชัน Raw ของสตรีมนี้จะถูกเขียนไปยังร้านค้าโดยตรง พื้นที่จัดเก็บข้อมูลดิบที่ใช้งานใน DynamoDB จะช่วยให้วิเคราะห์ตั๋วได้ลึกยิ่งขึ้นผ่านเครื่องมือ BI เช่น AWS Quick Sight
เราจะพิจารณาสองทางเลือกสำหรับการปรับใช้โครงสร้างพื้นฐานทั้งหมด:
- คู่มือ - ผ่าน AWS Management Console;
- โครงสร้างพื้นฐานจากโค้ด Terraform นั้นมีไว้สำหรับผู้สร้างอัตโนมัติที่ขี้เกียจ
สถาปัตยกรรมของระบบที่พัฒนาแล้ว
ส่วนประกอบที่ใช้:
Aviasales API — ข้อมูลที่ส่งคืนโดย API นี้จะถูกนำมาใช้สำหรับงานต่อๆ ไปทั้งหมดอินสแตนซ์ผู้ผลิต EC2 — เครื่องเสมือนปกติในระบบคลาวด์ที่จะสร้างสตรีมข้อมูลอินพุต:ตัวแทน Kinesis เป็นแอปพลิเคชัน Java ที่ติดตั้งในเครื่องซึ่งมอบวิธีง่ายๆ ในการรวบรวมและส่งข้อมูลไปยัง Kinesis (Kinesis Data Streams หรือ Kinesis Firehose) เอเจนต์จะตรวจสอบชุดของไฟล์ในไดเร็กทอรีที่ระบุอย่างต่อเนื่องและส่งข้อมูลใหม่ไปยัง Kinesisสคริปต์ผู้เรียก API — สคริปต์ Python ที่สร้างคำขอไปยัง API และใส่การตอบกลับลงในโฟลเดอร์ที่ Kinesis Agent ตรวจสอบ
สตรีมข้อมูล Kinesis — บริการสตรีมข้อมูลแบบเรียลไทม์พร้อมความสามารถในการขยายขนาดที่กว้างการวิเคราะห์ไคเนซิส เป็นบริการแบบไร้เซิร์ฟเวอร์ที่ทำให้การวิเคราะห์ข้อมูลสตรีมมิ่งแบบเรียลไทม์ง่ายขึ้น Amazon Kinesis Data Analytics กำหนดค่าทรัพยากรแอปพลิเคชันและปรับขนาดโดยอัตโนมัติเพื่อรองรับปริมาณข้อมูลขาเข้าAWS แลมบ์ดา — บริการที่ให้คุณรันโค้ดโดยไม่ต้องสำรองข้อมูลหรือตั้งค่าเซิร์ฟเวอร์ พลังการประมวลผลทั้งหมดจะถูกปรับขนาดโดยอัตโนมัติสำหรับการโทรแต่ละครั้งอเมซอน ไดนาโมดีบี - ฐานข้อมูลของคู่คีย์-ค่าและเอกสารที่ให้เวลาแฝงน้อยกว่า 10 มิลลิวินาทีเมื่อทำงานในทุกขนาด เมื่อใช้ DynamoDB คุณไม่จำเป็นต้องจัดเตรียม แพตช์ หรือจัดการเซิร์ฟเวอร์ใดๆ DynamoDB ปรับขนาดตารางโดยอัตโนมัติเพื่อปรับจำนวนทรัพยากรที่มีอยู่และรักษาประสิทธิภาพสูง ไม่จำเป็นต้องมีการดูแลระบบอเมซอน SNS - บริการที่มีการจัดการเต็มรูปแบบสำหรับการส่งข้อความโดยใช้โมเดลผู้เผยแพร่-สมาชิก (Pub/Sub) ซึ่งคุณสามารถแยกไมโครเซอร์วิส ระบบแบบกระจาย และแอปพลิเคชันแบบไร้เซิร์ฟเวอร์ได้ SNS สามารถใช้เพื่อส่งข้อมูลไปยังผู้ใช้ผ่านการแจ้งเตือนแบบพุชบนมือถือ ข้อความ SMS และอีเมล
การฝึกอบรมเบื้องต้น
เพื่อจำลองกระแสข้อมูล ฉันตัดสินใจใช้ข้อมูลตั๋วเครื่องบินที่ส่งคืนโดย Aviasales API ใน
ดังนั้น มาลงทะเบียนและรับโทเค็นของเรากันดีกว่า
คำขอตัวอย่างอยู่ด้านล่าง:
http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API
วิธีการรับข้อมูลจาก API ข้างต้นโดยการระบุโทเค็นในคำขอนั้นใช้ได้ แต่ฉันต้องการส่งโทเค็นการเข้าถึงผ่านส่วนหัว ดังนั้นเราจะใช้วิธีนี้ในสคริปต์ api_caller.py
ตัวอย่างคำตอบ:
{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
ตัวอย่างการตอบสนองของ API ด้านบนแสดงตั๋วจากเซนต์ปีเตอร์สเบิร์กถึงฟุก... โอ้ ช่างเป็นความฝัน...
เนื่องจากฉันมาจากคาซาน และภูเก็ตตอนนี้เป็น "เพียงความฝัน" เรามาดูตั๋วจากเซนต์ปีเตอร์สเบิร์กไปคาซานกันดีกว่า
จะถือว่าคุณมีบัญชี AWS อยู่แล้ว ฉันอยากจะแจ้งให้ทราบเป็นพิเศษทันทีว่า Kinesis และการส่งการแจ้งเตือนผ่าน SMS ไม่รวมอยู่ในรายปี
ฟรีเทียร์ (ใช้งานฟรี) . แต่ถึงแม้จะมีเงินสองสามดอลลาร์อยู่ในใจ แต่ก็ค่อนข้างเป็นไปได้ที่จะสร้างระบบที่เสนอและเล่นกับมัน และแน่นอนว่าอย่าลืมลบทรัพยากรทั้งหมดหลังจากที่ไม่จำเป็นต้องใช้อีกต่อไป
โชคดีที่ฟังก์ชัน DynamoDb และ lambda จะให้บริการฟรีหากเราใช้จ่ายถึงขีดจำกัดการใช้งานฟรีรายเดือนของเรา ตัวอย่างเช่น สำหรับ DynamoDB: พื้นที่จัดเก็บ 25 GB, 25 WCU/RCU และ 100 ล้านข้อความค้นหา และการเรียกใช้ฟังก์ชัน lambda ล้านครั้งต่อเดือน
การปรับใช้ระบบด้วยตนเอง
การตั้งค่า Kinesis Data Stream
ไปที่บริการ Kinesis Data Streams และสร้างสตรีมใหม่สองสตรีม โดยแต่ละสตรีมมีหนึ่งส่วน
เศษคืออะไร?
ชาร์ดคือหน่วยถ่ายโอนข้อมูลพื้นฐานของสตรีม Amazon Kinesis ส่วนหนึ่งมีการถ่ายโอนข้อมูลอินพุตที่ความเร็ว 1 MB/s และการถ่ายโอนข้อมูลเอาท์พุตที่ความเร็ว 2 MB/s หนึ่งเซ็กเมนต์รองรับรายการ PUT สูงสุด 1000 รายการต่อวินาที เมื่อสร้างสตรีมข้อมูล คุณต้องระบุจำนวนกลุ่มที่ต้องการ ตัวอย่างเช่น คุณสามารถสร้างสตรีมข้อมูลที่มีสองส่วนได้ สตรีมข้อมูลนี้จะให้การถ่ายโอนข้อมูลอินพุตที่ 2 MB/s และการถ่ายโอนข้อมูลเอาท์พุตที่ 4 MB/s ซึ่งรองรับบันทึก PUT สูงสุด 2000 รายการต่อวินาที
ยิ่งมีส่วนแบ่งข้อมูลในสตรีมของคุณมากเท่าใด ปริมาณงานก็จะยิ่งมากขึ้นเท่านั้น โดยหลักการแล้ว นี่คือวิธีการปรับขนาดโฟลว์ - โดยการเพิ่มส่วนย่อย แต่ยิ่งคุณมีเศษมากเท่าไร ราคาก็ยิ่งสูงขึ้นเท่านั้น แต่ละชาร์ดมีราคา 1,5 เซนต์ต่อชั่วโมง และเพิ่มอีก 1.4 เซนต์สำหรับเพย์โหลด PUT ทุก ๆ ล้านหน่วย
มาสร้างกระแสใหม่ด้วยชื่อกันเถอะ ตั๋วเครื่องบิน1 ชิ้นก็เพียงพอสำหรับเขา:
ตอนนี้เรามาสร้างหัวข้อใหม่โดยใช้ชื่อกัน พิเศษ_สตรีม:
การตั้งค่าผู้ผลิต
ในการวิเคราะห์งาน การใช้ EC2 instance ทั่วไปเป็นผู้ผลิตข้อมูลก็เพียงพอแล้ว ไม่จำเป็นต้องเป็นเครื่องเสมือนที่ทรงพลังและมีราคาแพง สปอต t2.micro ก็ทำได้ดี
หมายเหตุสำคัญ: ตัวอย่างเช่น คุณควรใช้อิมเมจ - Amazon Linux AMI 2018.03.0 โดยมีการตั้งค่าน้อยกว่าสำหรับการเปิดใช้ Kinesis Agent อย่างรวดเร็ว
ไปที่บริการ EC2 สร้างเครื่องเสมือนใหม่ เลือก AMI ที่ต้องการด้วยประเภท t2.micro ซึ่งรวมอยู่ใน Free Tier:
เพื่อให้เครื่องเสมือนที่สร้างขึ้นใหม่สามารถโต้ตอบกับบริการ Kinesis ได้ จะต้องได้รับสิทธิ์ในการดำเนินการดังกล่าว วิธีที่ดีที่สุดในการทำเช่นนี้คือการกำหนดบทบาท IAM ดังนั้น ในจอภาพ ขั้นตอนที่ 3: กำหนดค่ารายละเอียดอินสแตนซ์ คุณควรเลือก สร้างบทบาท IAM ใหม่:
การสร้างบทบาท IAM สำหรับ EC2
ในหน้าต่างที่เปิดขึ้น ให้เลือกว่าเรากำลังสร้างบทบาทใหม่สำหรับ EC2 และไปที่ส่วนสิทธิ์:
จากตัวอย่างการฝึกอบรม เราไม่จำเป็นต้องเจาะลึกรายละเอียดปลีกย่อยทั้งหมดของการกำหนดค่าสิทธิ์ทรัพยากร ดังนั้นเราจะเลือกนโยบายที่ Amazon กำหนดค่าไว้ล่วงหน้า: AmazonKinesisFullAccess และ CloudWatchFullAccess
เรามาตั้งชื่อที่มีความหมายสำหรับบทบาทนี้กัน เช่น: EC2-KinesisStreams-FullAccess ผลลัพธ์ควรจะเหมือนกับที่แสดงในภาพด้านล่าง:
หลังจากสร้างบทบาทใหม่นี้แล้ว อย่าลืมแนบบทบาทเข้ากับอินสแตนซ์เครื่องเสมือนที่สร้างขึ้น:
เราไม่เปลี่ยนแปลงสิ่งอื่นใดบนหน้าจอนี้และไปยังหน้าต่างถัดไป
คุณสามารถปล่อยให้การตั้งค่าฮาร์ดไดรฟ์เป็นค่าเริ่มต้นได้ เช่นเดียวกับแท็ก (แม้ว่าจะเป็นวิธีปฏิบัติที่ดีที่จะใช้แท็ก แต่อย่างน้อยก็ตั้งชื่ออินสแตนซ์และระบุสภาพแวดล้อม)
ตอนนี้เราอยู่ในขั้นตอนที่ 6: แท็บกำหนดค่ากลุ่มความปลอดภัย ซึ่งคุณจะต้องสร้างใหม่หรือระบุกลุ่มความปลอดภัยที่มีอยู่ ซึ่งช่วยให้คุณเชื่อมต่อผ่าน ssh (พอร์ต 22) ไปยังอินสแตนซ์ได้ เลือกแหล่งที่มา -> IP ของฉันที่นั่น และคุณสามารถเปิดใช้งานอินสแตนซ์ได้
ทันทีที่เปลี่ยนเป็นสถานะการทำงาน คุณสามารถลองเชื่อมต่อผ่าน ssh ได้
เพื่อให้สามารถทำงานร่วมกับ Kinesis Agent ได้ หลังจากเชื่อมต่อกับเครื่องสำเร็จแล้ว คุณต้องป้อนคำสั่งต่อไปนี้ในเทอร์มินัล:
sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
มาสร้างโฟลเดอร์เพื่อบันทึกการตอบกลับ API:
sudo mkdir /var/log/airline_tickets
ก่อนที่จะเริ่มเอเจนต์ คุณต้องกำหนดค่าการกำหนดค่า:
sudo vim /etc/aws-kinesis/agent.json
เนื้อหาของไฟล์ agent.json ควรมีลักษณะดังนี้:
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
ดังที่เห็นได้จากไฟล์การกำหนดค่า เอเจนต์จะตรวจสอบไฟล์ที่มีนามสกุล .log ในไดเร็กทอรี /var/log/airline_tickets/ จากนั้นแยกวิเคราะห์และถ่ายโอนไปยังสตรีม airline_tickets
เรารีสตาร์ทบริการและตรวจสอบให้แน่ใจว่าบริการนั้นเปิดใช้งานอยู่:
sudo service aws-kinesis-agent restart
ตอนนี้เรามาดาวน์โหลดสคริปต์ Python ที่จะขอข้อมูลจาก API กัน:
REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
สคริปต์ api_caller.py ร้องขอข้อมูลจาก Aviasales และบันทึกการตอบสนองที่ได้รับในไดเร็กทอรีที่ตัวแทน Kinesis สแกน การใช้งานสคริปต์นี้ค่อนข้างเป็นมาตรฐาน มีคลาส TicketsApi ซึ่งช่วยให้คุณสามารถดึง API แบบอะซิงโครนัสได้ เราส่งส่วนหัวที่มีโทเค็นและขอพารามิเตอร์ไปยังคลาสนี้:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
เพื่อทดสอบการตั้งค่าและการทำงานที่ถูกต้องของเอเจนต์ ให้ทดสอบรันสคริปต์ api_caller.py:
sudo ./api_caller.py TOKEN
และเราดูผลลัพธ์ของการทำงานในบันทึกของตัวแทนและในแท็บการตรวจสอบในสตรีมข้อมูล airline_tickets:
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
อย่างที่คุณเห็น ทุกอย่างทำงานได้และ Kinesis Agent ส่งข้อมูลไปยังสตรีมได้สำเร็จ ตอนนี้เรามากำหนดค่าคอนซูเมอร์กันดีกว่า
การตั้งค่าการวิเคราะห์ข้อมูล Kinesis
มาดูส่วนประกอบส่วนกลางของทั้งระบบกัน - สร้างแอปพลิเคชันใหม่ใน Kinesis Data Analytics ชื่อ kinesis_analytics_airlines_app:
Kinesis Data Analytics ช่วยให้คุณดำเนินการวิเคราะห์ข้อมูลแบบเรียลไทม์จาก Kinesis Streams โดยใช้ภาษา SQL เป็นบริการปรับขนาดอัตโนมัติโดยสมบูรณ์ (ไม่เหมือนกับ Kinesis Streams) ที่:
- ช่วยให้คุณสร้างสตรีมใหม่ (Output Stream) ตามคำขอไปยังแหล่งข้อมูล
- จัดเตรียมสตรีมพร้อมข้อผิดพลาดที่เกิดขึ้นในขณะที่แอปพลิเคชันกำลังทำงาน (สตรีมข้อผิดพลาด)
- สามารถกำหนดโครงร่างข้อมูลอินพุตได้โดยอัตโนมัติ (สามารถกำหนดใหม่ได้ด้วยตนเองหากจำเป็น)
นี่ไม่ใช่บริการราคาถูก - 0.11 USD ต่อชั่วโมงการทำงาน ดังนั้นคุณควรใช้อย่างระมัดระวังและลบออกเมื่อดำเนินการเสร็จ
มาเชื่อมต่อแอปพลิเคชันกับแหล่งข้อมูล:
เลือกสตรีมที่เราจะเชื่อมต่อด้วย (airline_tickets):
ถัดไป คุณต้องแนบบทบาท IAM ใหม่เพื่อให้แอปพลิเคชันสามารถอ่านจากสตรีมและเขียนไปยังสตรีมได้ ในการดำเนินการนี้ ก็ไม่เพียงพอที่จะเปลี่ยนแปลงสิ่งใดในบล็อกสิทธิ์การเข้าถึง:
ตอนนี้เรามาขอให้ค้นพบสคีมาข้อมูลในสตรีม โดยคลิกที่ปุ่ม "ค้นพบสคีมา" ด้วยเหตุนี้ บทบาท IAM จะได้รับการอัปเดต (จะมีการสร้างบทบาทใหม่) และการตรวจจับสคีมาจะเปิดตัวจากข้อมูลที่มาถึงในสตรีมแล้ว:
ตอนนี้คุณต้องไปที่ตัวแก้ไข SQL เมื่อคุณคลิกที่ปุ่มนี้ หน้าต่างจะปรากฏขึ้นเพื่อขอให้คุณเปิดแอปพลิเคชัน - เลือกสิ่งที่คุณต้องการเปิดใช้งาน:
แทรกเคียวรีแบบง่ายต่อไปนี้ลงในหน้าต่างเอดิเตอร์ SQL และคลิก บันทึกและรัน SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
ในฐานข้อมูลเชิงสัมพันธ์ คุณทำงานกับตารางโดยใช้คำสั่ง INSERT เพื่อเพิ่มบันทึกและคำสั่ง SELECT เพื่อสืบค้นข้อมูล ใน Amazon Kinesis Data Analytics คุณจะทำงานร่วมกับสตรีม (STREAM) และปั๊ม (PUMP) ซึ่งเป็นคำขอแทรกอย่างต่อเนื่องซึ่งจะแทรกข้อมูลจากสตรีมหนึ่งไปยังแอปพลิเคชันหนึ่งไปยังสตรีมอื่น
แบบสอบถาม SQL ที่นำเสนอข้างต้นค้นหาตั๋ว Aeroflot ที่ราคาต่ำกว่าห้าพันรูเบิล บันทึกทั้งหมดที่ตรงตามเงื่อนไขเหล่านี้จะอยู่ในสตรีม DESTINATION_SQL_STREAM
ในบล็อกปลายทาง เลือกสตรีม special_stream และในรายการดรอปดาวน์ชื่อสตรีมในแอปพลิเคชัน DESTINATION_SQL_STREAM:
ผลลัพธ์ของการปรับเปลี่ยนทั้งหมดควรมีลักษณะคล้ายกับภาพด้านล่าง:
การสร้างและสมัครรับหัวข้อ SNS
ไปที่บริการแจ้งเตือนอย่างง่ายและสร้างหัวข้อใหม่ที่นั่นด้วยชื่อสายการบิน:
สมัครสมาชิกหัวข้อนี้และระบุหมายเลขโทรศัพท์มือถือที่จะส่งการแจ้งเตือนทาง SMS:
สร้างตารางใน DynamoDB
หากต้องการจัดเก็บข้อมูลดิบจากสตรีม airline_tickets เรามาสร้างตารางใน DynamoDB ด้วยชื่อเดียวกันกันดีกว่า เราจะใช้ record_id เป็นคีย์หลัก:
การสร้างตัวรวบรวมฟังก์ชันแลมบ์ดา
มาสร้างฟังก์ชัน lambda ชื่อ Collector ซึ่งมีหน้าที่สำรวจกระแส airline_tickets และหากพบบันทึกใหม่ที่นั่น ให้แทรกบันทึกเหล่านี้ลงในตาราง DynamoDB แน่นอนว่านอกเหนือจากสิทธิ์เริ่มต้นแล้ว lambda นี้จะต้องมีสิทธิ์การอ่านสตรีมข้อมูล Kinesis และสิทธิ์การเขียน DynamoDB
การสร้างบทบาท IAM สำหรับฟังก์ชัน lambda ของตัวรวบรวม
ก่อนอื่น มาสร้างบทบาท IAM ใหม่สำหรับ lambda ชื่อ Lambda-TicketsProcessingRole:
สำหรับตัวอย่างการทดสอบ นโยบาย AmazonKinesisReadOnlyAccess และ AmazonDynamoDBFullAccess ที่กำหนดค่าไว้ล่วงหน้านั้นค่อนข้างเหมาะสม ดังที่แสดงในภาพด้านล่าง:
แลมบ์ดานี้ควรเปิดใช้งานโดยทริกเกอร์จาก Kinesis เมื่อมีรายการใหม่เข้าสู่ airline_stream ดังนั้นเราจึงจำเป็นต้องเพิ่มทริกเกอร์ใหม่:
สิ่งที่เหลืออยู่คือการใส่โค้ดและบันทึกแลมบ์ดา
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
การสร้างตัวแจ้งเตือนฟังก์ชันแลมบ์ดา
ฟังก์ชันแลมบ์ดาตัวที่สองซึ่งจะตรวจสอบสตรีมที่สอง (special_stream) และส่งการแจ้งเตือนไปยัง SNS นั้นถูกสร้างขึ้นในลักษณะที่คล้ายกัน ดังนั้นแลมบ์ดานี้จะต้องมีสิทธิ์เข้าถึงเพื่ออ่านจาก Kinesis และส่งข้อความไปยังหัวข้อ SNS ที่ระบุ ซึ่งจากนั้นบริการ SNS จะถูกส่งโดยบริการ SNS ไปยังสมาชิกทั้งหมดของหัวข้อนี้ (อีเมล, SMS ฯลฯ)
การสร้างบทบาท IAM
ขั้นแรก เราสร้างบทบาท IAM Lambda-KinesisAlarm สำหรับ lambda นี้ จากนั้นกำหนดบทบาทนี้ให้กับ lambda alarm_notifier ที่ถูกสร้างขึ้น:
lambda นี้ควรทำงานกับทริกเกอร์เพื่อให้บันทึกใหม่เข้าสู่ special_stream ดังนั้นคุณจึงต้องกำหนดค่าทริกเกอร์ในลักษณะเดียวกับที่เราทำกับ Collector lambda
เพื่อให้กำหนดค่า lambda นี้ได้ง่ายขึ้น เราขอแนะนำตัวแปรสภาพแวดล้อมใหม่ - TOPIC_ARN โดยที่เราใส่ ANR (Amazon Recourse Names) ของหัวข้อสายการบิน:
และใส่โค้ดแลมบ์ดาลงไป มันไม่ซับซ้อนเลย:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
ดูเหมือนว่านี่คือจุดที่การกำหนดค่าระบบด้วยตนเองเสร็จสมบูรณ์ สิ่งที่เหลืออยู่คือการทดสอบและตรวจสอบให้แน่ใจว่าเราได้กำหนดค่าทุกอย่างถูกต้องแล้ว
ปรับใช้จากโค้ด Terraform
การเตรียมการที่จำเป็น
คุณสามารถดาวน์โหลดการแจกจ่าย
เริ่มอย่างไร
รหัสเต็มของโครงการคือ
แนวปฏิบัติที่ดีคือการรันคำสั่ง plan ก่อนที่จะปรับใช้โครงสร้างพื้นฐานทั้งหมดเพื่อดูว่า Terraform กำลังสร้างอะไรให้เราในระบบคลาวด์:
terraform.exe plan
คุณจะได้รับแจ้งให้ป้อนหมายเลขโทรศัพท์เพื่อส่งการแจ้งเตือน ไม่จำเป็นต้องป้อนในขั้นตอนนี้
เมื่อวิเคราะห์แผนการดำเนินงานของโปรแกรมแล้ว เราก็สามารถเริ่มสร้างทรัพยากรได้:
terraform.exe apply
หลังจากส่งคำสั่งนี้คุณจะถูกขอให้ป้อนหมายเลขโทรศัพท์อีกครั้ง กด "ใช่" เมื่อมีคำถามเกี่ยวกับการดำเนินการจริงปรากฏขึ้น ซึ่งจะช่วยให้คุณสามารถตั้งค่าโครงสร้างพื้นฐานทั้งหมด ดำเนินการกำหนดค่าที่จำเป็นทั้งหมดของ EC2 ปรับใช้ฟังก์ชัน lambda ฯลฯ
หลังจากสร้างทรัพยากรทั้งหมดสำเร็จผ่านโค้ด Terraform แล้ว คุณต้องเข้าไปดูรายละเอียดของแอปพลิเคชัน Kinesis Analytics (ขออภัย ฉันไม่พบวิธีดำเนินการนี้จากโค้ดโดยตรง)
เปิดแอปพลิเคชัน:
หลังจากนี้ คุณต้องตั้งชื่อสตรีมในแอปพลิเคชันอย่างชัดเจนโดยเลือกจากรายการแบบเลื่อนลง:
ตอนนี้ทุกอย่างก็พร้อมที่จะไป
กำลังทดสอบแอปพลิเคชัน
ไม่ว่าคุณจะปรับใช้ระบบด้วยตนเองหรือผ่านโค้ด Terraform อย่างไร ระบบก็จะทำงานเหมือนเดิม
เราเข้าสู่ระบบผ่าน SSH ไปยังเครื่องเสมือน EC2 ที่ติดตั้ง Kinesis Agent และเรียกใช้สคริปต์ api_caller.py
sudo ./api_caller.py TOKEN
สิ่งที่คุณต้องทำคือรอ SMS ไปที่หมายเลขของคุณ:
SMS - ข้อความมาถึงโทรศัพท์ของคุณภายในเกือบ 1 นาที:
ต้องรอดูว่าบันทึกต่างๆ ได้รับการบันทึกในฐานข้อมูล DynamoDB เพื่อการวิเคราะห์ที่มีรายละเอียดมากขึ้นในภายหลังหรือไม่ ตาราง airline_tickets มีข้อมูลโดยประมาณต่อไปนี้:
ข้อสรุป
ในระหว่างการทำงานนี้ ระบบประมวลผลข้อมูลออนไลน์ถูกสร้างขึ้นโดยใช้ Amazon Kinesis ตัวเลือกสำหรับการใช้ Kinesis Agent ร่วมกับ Kinesis Data Streams และการวิเคราะห์แบบเรียลไทม์ Kinesis Analytics โดยใช้คำสั่ง SQL รวมถึงการโต้ตอบของ Amazon Kinesis กับบริการ AWS อื่นๆ ได้รับการพิจารณาแล้ว
เราปรับใช้ระบบข้างต้นในสองวิธี: แบบแมนนวลที่ค่อนข้างยาว และแบบด่วนจากโค้ด Terraform
มีซอร์สโค้ดโครงการทั้งหมด
ฉันยินดีที่จะหารือเกี่ยวกับบทความนี้ ฉันหวังว่าจะแสดงความคิดเห็นของคุณ ฉันหวังว่าจะวิจารณ์อย่างสร้างสรรค์
ฉันขอให้คุณประสบความสำเร็จ!
ที่มา: will.com