สวัสดีทุกคน. เราแบ่งปันการแปลส่วนสุดท้ายของบทความซึ่งจัดทำขึ้นโดยเฉพาะสำหรับนักเรียนของหลักสูตร
Apache Beam และ DataFlow สำหรับไปป์ไลน์แบบเรียลไทม์
การตั้งค่า Google Cloud
หมายเหตุ: ฉันใช้ Google Cloud Shell เพื่อเรียกใช้ไปป์ไลน์และเผยแพร่ข้อมูลบันทึกผู้ใช้ เพราะฉันประสบปัญหาในการเรียกใช้ไปป์ไลน์ใน Python 3 Google Cloud Shell ใช้ Python 2 ซึ่งสอดคล้องกับ Apache Beam มากกว่า
ในการทำให้ไปป์ไลน์ทำงาน เราต้องเจาะลึกเข้าไปในการตั้งค่าเล็กน้อย สำหรับผู้ที่ไม่เคยใช้ GCP มาก่อน คุณต้องทำตาม 6 ขั้นตอนต่อไปนี้ให้ครบถ้วน
หลังจากนั้น เราจะต้องอัปโหลดสคริปต์ของเราไปยัง Google Cloud Storage และคัดลอกไปยัง Google Cloud Shel ของเรา การอัปโหลดไปยังที่เก็บข้อมูลบนคลาวด์นั้นค่อนข้างเล็กน้อย (สามารถหาคำอธิบายได้
2 รูป
คำสั่งที่เราต้องใช้ในการคัดลอกไฟล์และติดตั้งไลบรารีที่จำเป็นแสดงอยู่ด้านล่าง
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
การสร้างฐานข้อมูลและตารางของเรา
เมื่อเราทำตามขั้นตอนการตั้งค่าทั้งหมดเสร็จแล้ว สิ่งต่อไปที่เราต้องทำคือสร้างชุดข้อมูลและตารางใน BigQuery มีหลายวิธีในการทำเช่นนี้ แต่วิธีที่ง่ายที่สุดคือการใช้ Google Cloud Console โดยสร้างชุดข้อมูลก่อน คุณสามารถทำตามขั้นตอนต่อไปนี้
รูปที่ 3. สคีมาของตาราง
การเผยแพร่ข้อมูลบันทึกของผู้ใช้
Pub/Sub เป็นองค์ประกอบที่สำคัญของระบบไปป์ไลน์ของเรา เนื่องจากช่วยให้แอปพลิเคชันอิสระหลายตัวสามารถสื่อสารระหว่างกันได้ โดยเฉพาะอย่างยิ่ง มันทำงานเป็นตัวกลางให้เราส่งและรับข้อความระหว่างแอปพลิเคชัน สิ่งแรกที่เราต้องทำคือสร้างหัวข้อ (หัวข้อ) เพียงไปที่ Pub/Sub ในคอนโซลแล้วคลิกสร้างหัวข้อ
โค้ดด้านล่างเรียกสคริปต์ของเราเพื่อสร้างข้อมูลบันทึกที่กำหนดไว้ด้านบน จากนั้นเชื่อมต่อและส่งบันทึกไปยัง Pub/Sub สิ่งเดียวที่เราต้องทำคือสร้างวัตถุ ลูกค้าผู้จัดพิมพ์ระบุพาธธีมโดยใช้เมธอด topic_path
และเรียกใช้ฟังก์ชัน publish
с topic_path
และข้อมูล โปรดทราบว่าเรากำลังนำเข้า generate_log_line
จากสคริปต์ของเรา stream_logs
ดังนั้นตรวจสอบให้แน่ใจว่าไฟล์เหล่านั้นอยู่ในโฟลเดอร์เดียวกัน มิฉะนั้น คุณจะได้รับข้อผิดพลาดในการนำเข้า จากนั้นเราสามารถเรียกใช้ผ่านคอนโซล Google ของเราโดยใช้:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
เมื่อรันไฟล์แล้ว เราจะสามารถสังเกตเอาต์พุตของข้อมูลบันทึกไปยังคอนโซลได้ดังแสดงในรูปด้านล่าง สคริปต์นี้จะทำงานตราบเท่าที่เราไม่ได้ใช้ CTRL + Cเพื่อให้มันสมบูรณ์
รูปที่ 4 บทสรุป publish_logs.py
การเขียน Pipeline Code ของเรา
ตอนนี้เราได้ตั้งค่าทุกอย่างเรียบร้อยแล้ว เข้าสู่ส่วนที่สนุก นั่นก็คือ การเขียนโค้ดไปป์ไลน์ของเราโดยใช้ Beam และ Python ในการสร้าง Beam ไปป์ไลน์ เราจำเป็นต้องสร้างวัตถุไปป์ไลน์ (p) เมื่อเราสร้างวัตถุไปป์ไลน์แล้ว เราสามารถใช้หลายฟังก์ชันทีละฟังก์ชันโดยใช้โอเปอเรเตอร์ pipe (|)
. โดยทั่วไปแล้วเวิร์กโฟลว์จะมีลักษณะดังรูปด้านล่าง
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
ในโค้ดของเรา เราจะสร้างฟังก์ชันที่กำหนดโดยผู้ใช้สองฟังก์ชัน การทำงาน regex_clean
ซึ่งจะสแกนข้อมูลและแยกสตริงที่ตรงกันตามรายการของรูปแบบโดยใช้ฟังก์ชัน re.search
. ฟังก์ชันส่งคืนสตริงที่คั่นด้วยเครื่องหมายจุลภาค หากคุณไม่ใช่ผู้เชี่ยวชาญเกี่ยวกับนิพจน์ทั่วไป เราขอแนะนำให้ลองดูสิ่งนี้ datetime
ภายในฟังก์ชั่นเพื่อให้มันทำงานได้ ฉันได้รับข้อผิดพลาดในการนำเข้าที่จุดเริ่มต้นของไฟล์ ซึ่งเป็นเรื่องแปลก รายการนี้จะถูกส่งต่อไปยังฟังก์ชัน เขียนถึงBigQueryซึ่งเพียงแค่เพิ่มข้อมูลของเราลงในตาราง รหัสสำหรับ Batch DataFlow Job และ Streaming DataFlow Job ระบุไว้ด้านล่าง ข้อแตกต่างเพียงอย่างเดียวระหว่างรหัสชุดและสตรีมคือชุดที่เราอ่านจาก CSV src_path
การใช้ฟังก์ชัน ReadFromText
จาก บีม.
Batch DataFlow Job (การประมวลผลเป็นชุด)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
งานการสตรีม DataFlow (การประมวลผลสตรีม)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
กำลังดำเนินการไปป์ไลน์
เราสามารถเริ่มไปป์ไลน์ได้หลายวิธี หากเราต้องการ เราก็สามารถเรียกใช้จากเทอร์มินัลในเครื่องได้โดยการลงชื่อเข้าใช้ GCP จากระยะไกล
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
อย่างไรก็ตาม เราจะเรียกใช้ด้วย DataFlow เราสามารถทำได้ด้วยคำสั่งด้านล่างโดยตั้งค่าพารามิเตอร์ที่จำเป็นต่อไปนี้
project
- ID ของโครงการ GCP ของคุณrunner
เป็นไปป์ไลน์รันเนอร์ที่จะแยกวิเคราะห์โปรแกรมของคุณและสร้างไปป์ไลน์ของคุณ หากต้องการเรียกใช้ในระบบคลาวด์ คุณต้องระบุ DataflowRunnerstaging_location
เป็นเส้นทางไปยังที่เก็บข้อมูลบนคลาวด์ของ Cloud Dataflow สำหรับการจัดทำดัชนีแพ็คเกจรหัสที่จำเป็นโดยตัวจัดการที่ทำงานtemp_location
- เส้นทางไปยังที่เก็บข้อมูลบนคลาวด์ Cloud Dataflow สำหรับวางไฟล์งานชั่วคราวที่สร้างขึ้นระหว่างการทำงานของไปป์ไลน์streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
ขณะที่คำสั่งนี้ทำงานอยู่ เราสามารถไปที่แท็บ DataFlow ในคอนโซลของ Google และดูไปป์ไลน์ของเราได้ เมื่อคลิกที่ไปป์ไลน์ เราควรจะเห็นบางอย่างที่คล้ายกับรูปที่ 4 สำหรับจุดประสงค์ในการแก้ไขจุดบกพร่อง การไปที่บันทึกแล้วไปที่ Stackdriver เพื่อดูบันทึกโดยละเอียดจะมีประโยชน์มาก สิ่งนี้ช่วยให้ฉันแก้ปัญหาเกี่ยวกับไปป์ไลน์ได้ในหลายกรณี
รูปที่ 4: ท่อส่งลำแสง
การเข้าถึงข้อมูลของเราใน BigQuery
ดังนั้นเราควรมีไปป์ไลน์ที่ทำงานกับข้อมูลเข้ามาในตารางของเราแล้ว ในการทดสอบนี้ เราสามารถไปที่ BigQuery และดูข้อมูลได้ หลังจากใช้คำสั่งด้านล่าง คุณควรเห็นบรรทัดแรกของชุดข้อมูล ตอนนี้เรามีข้อมูลที่จัดเก็บไว้ใน BigQuery แล้ว เราสามารถทำการวิเคราะห์เพิ่มเติม รวมทั้งแชร์ข้อมูลกับเพื่อนร่วมงานและเริ่มตอบคำถามทางธุรกิจได้
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
รูปที่ 5: BigQuery
ข้อสรุป
เราหวังว่าโพสต์นี้จะเป็นตัวอย่างที่มีประโยชน์ในการสร้างไปป์ไลน์ข้อมูลการสตรีม ตลอดจนหาวิธีทำให้ข้อมูลเข้าถึงได้มากขึ้น การจัดเก็บข้อมูลในรูปแบบนี้ทำให้เรามีข้อดีหลายประการ ตอนนี้เราสามารถเริ่มตอบคำถามสำคัญ เช่น มีคนใช้ผลิตภัณฑ์ของเรากี่คน? ฐานผู้ใช้เติบโตขึ้นเมื่อเวลาผ่านไปหรือไม่? ผู้คนมีปฏิสัมพันธ์กับผลิตภัณฑ์ในแง่มุมใดมากที่สุด และมีข้อผิดพลาดที่ไม่ควรอยู่หรือไม่? คำถามเหล่านี้เป็นคำถามที่น่าสนใจสำหรับองค์กร จากข้อมูลเชิงลึกที่ได้จากคำตอบของคำถามเหล่านี้ เราจะสามารถปรับปรุงผลิตภัณฑ์และเพิ่มการมีส่วนร่วมของผู้ใช้ได้
Beam มีประโยชน์มากสำหรับการออกกำลังกายประเภทนี้ และยังมีกรณีการใช้งานอื่นๆ ที่น่าสนใจอีกมากมาย ตัวอย่างเช่น คุณสามารถวิเคราะห์ข้อมูลการขึ้นหุ้นแบบเรียลไทม์และทำการซื้อขายตามการวิเคราะห์ บางทีคุณอาจมีข้อมูลเซ็นเซอร์ที่มาจากยานพาหนะและต้องการคำนวณระดับการจราจร ตัวอย่างเช่น คุณสามารถเป็นบริษัทเกมที่รวบรวมข้อมูลผู้ใช้และใช้ข้อมูลนั้นเพื่อสร้างแดชบอร์ดเพื่อติดตามเมตริกหลัก โอเค สุภาพบุรุษ นี่คือหัวข้อสำหรับโพสต์อื่น ขอบคุณที่อ่าน และสำหรับผู้ที่ต้องการดูโค้ดแบบเต็ม ด้านล่างนี้คือลิงก์ไปยัง GitHub ของฉัน
นั่นคือทั้งหมดที่
ที่มา: will.com