เราสร้างไปป์ไลน์สำหรับการประมวลผลข้อมูลแบบสตรีม ส่วนที่ 2

สวัสดีทุกคน. เราแบ่งปันการแปลส่วนสุดท้ายของบทความซึ่งจัดทำขึ้นโดยเฉพาะสำหรับนักเรียนของหลักสูตร วิศวกรข้อมูล. ส่วนแรกสามารถหาได้ ที่นี่.

Apache Beam และ DataFlow สำหรับไปป์ไลน์แบบเรียลไทม์

เราสร้างไปป์ไลน์สำหรับการประมวลผลข้อมูลแบบสตรีม ส่วนที่ 2

การตั้งค่า Google Cloud

หมายเหตุ: ฉันใช้ Google Cloud Shell เพื่อเรียกใช้ไปป์ไลน์และเผยแพร่ข้อมูลบันทึกผู้ใช้ เพราะฉันประสบปัญหาในการเรียกใช้ไปป์ไลน์ใน Python 3 Google Cloud Shell ใช้ Python 2 ซึ่งสอดคล้องกับ Apache Beam มากกว่า

ในการทำให้ไปป์ไลน์ทำงาน เราต้องเจาะลึกเข้าไปในการตั้งค่าเล็กน้อย สำหรับผู้ที่ไม่เคยใช้ GCP มาก่อน คุณต้องทำตาม 6 ขั้นตอนต่อไปนี้ให้ครบถ้วน หน้า.

หลังจากนั้น เราจะต้องอัปโหลดสคริปต์ของเราไปยัง Google Cloud Storage และคัดลอกไปยัง Google Cloud Shel ของเรา การอัปโหลดไปยังที่เก็บข้อมูลบนคลาวด์นั้นค่อนข้างเล็กน้อย (สามารถหาคำอธิบายได้ ที่นี่). ในการคัดลอกไฟล์ของเรา เราสามารถเปิด Google Cloud Shel จากแถบเครื่องมือโดยคลิกไอคอนแรกทางซ้ายในรูปที่ 2 ด้านล่าง

เราสร้างไปป์ไลน์สำหรับการประมวลผลข้อมูลแบบสตรีม ส่วนที่ 2
2 รูป

คำสั่งที่เราต้องใช้ในการคัดลอกไฟล์และติดตั้งไลบรารีที่จำเป็นแสดงอยู่ด้านล่าง

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

การสร้างฐานข้อมูลและตารางของเรา

เมื่อเราทำตามขั้นตอนการตั้งค่าทั้งหมดเสร็จแล้ว สิ่งต่อไปที่เราต้องทำคือสร้างชุดข้อมูลและตารางใน BigQuery มีหลายวิธีในการทำเช่นนี้ แต่วิธีที่ง่ายที่สุดคือการใช้ Google Cloud Console โดยสร้างชุดข้อมูลก่อน คุณสามารถทำตามขั้นตอนต่อไปนี้ ลิงค์เพื่อสร้างตารางด้วยสคีมา โต๊ะของเราจะมี 7 คอลัมน์สอดคล้องกับส่วนประกอบของบันทึกผู้ใช้แต่ละคน เพื่อความสะดวก เราจะกำหนดคอลัมน์ทั้งหมดเป็นสตริง (ของประเภทสตริง) ยกเว้นตัวแปร timelocal และตั้งชื่อตามตัวแปรที่เราสร้างขึ้นก่อนหน้านี้ เค้าโครงตารางของเราควรมีลักษณะเหมือนรูปที่ 3

เราสร้างไปป์ไลน์สำหรับการประมวลผลข้อมูลแบบสตรีม ส่วนที่ 2
รูปที่ 3. สคีมาของตาราง

การเผยแพร่ข้อมูลบันทึกของผู้ใช้

Pub/Sub เป็นองค์ประกอบที่สำคัญของระบบไปป์ไลน์ของเรา เนื่องจากช่วยให้แอปพลิเคชันอิสระหลายตัวสามารถสื่อสารระหว่างกันได้ โดยเฉพาะอย่างยิ่ง มันทำงานเป็นตัวกลางให้เราส่งและรับข้อความระหว่างแอปพลิเคชัน สิ่งแรกที่เราต้องทำคือสร้างหัวข้อ (หัวข้อ) เพียงไปที่ Pub/Sub ในคอนโซลแล้วคลิกสร้างหัวข้อ

โค้ดด้านล่างเรียกสคริปต์ของเราเพื่อสร้างข้อมูลบันทึกที่กำหนดไว้ด้านบน จากนั้นเชื่อมต่อและส่งบันทึกไปยัง Pub/Sub สิ่งเดียวที่เราต้องทำคือสร้างวัตถุ ลูกค้าผู้จัดพิมพ์ระบุพาธธีมโดยใช้เมธอด topic_path และเรียกใช้ฟังก์ชัน publish с topic_path และข้อมูล โปรดทราบว่าเรากำลังนำเข้า generate_log_line จากสคริปต์ของเรา stream_logsดังนั้นตรวจสอบให้แน่ใจว่าไฟล์เหล่านั้นอยู่ในโฟลเดอร์เดียวกัน มิฉะนั้น คุณจะได้รับข้อผิดพลาดในการนำเข้า จากนั้นเราสามารถเรียกใช้ผ่านคอนโซล Google ของเราโดยใช้:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

เมื่อรันไฟล์แล้ว เราจะสามารถสังเกตเอาต์พุตของข้อมูลบันทึกไปยังคอนโซลได้ดังแสดงในรูปด้านล่าง สคริปต์นี้จะทำงานตราบเท่าที่เราไม่ได้ใช้ CTRL + Cเพื่อให้มันสมบูรณ์

เราสร้างไปป์ไลน์สำหรับการประมวลผลข้อมูลแบบสตรีม ส่วนที่ 2
รูปที่ 4 บทสรุป publish_logs.py

การเขียน Pipeline Code ของเรา

ตอนนี้เราได้ตั้งค่าทุกอย่างเรียบร้อยแล้ว เข้าสู่ส่วนที่สนุก นั่นก็คือ การเขียนโค้ดไปป์ไลน์ของเราโดยใช้ Beam และ Python ในการสร้าง Beam ไปป์ไลน์ เราจำเป็นต้องสร้างวัตถุไปป์ไลน์ (p) เมื่อเราสร้างวัตถุไปป์ไลน์แล้ว เราสามารถใช้หลายฟังก์ชันทีละฟังก์ชันโดยใช้โอเปอเรเตอร์ pipe (|). โดยทั่วไปแล้วเวิร์กโฟลว์จะมีลักษณะดังรูปด้านล่าง

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

ในโค้ดของเรา เราจะสร้างฟังก์ชันที่กำหนดโดยผู้ใช้สองฟังก์ชัน การทำงาน regex_cleanซึ่งจะสแกนข้อมูลและแยกสตริงที่ตรงกันตามรายการของรูปแบบโดยใช้ฟังก์ชัน re.search. ฟังก์ชันส่งคืนสตริงที่คั่นด้วยเครื่องหมายจุลภาค หากคุณไม่ใช่ผู้เชี่ยวชาญเกี่ยวกับนิพจน์ทั่วไป เราขอแนะนำให้ลองดูสิ่งนี้ กวดวิชา และฝึกฝนในแผ่นจดบันทึกเพื่อทดสอบโค้ด จากนั้นเราจะกำหนดฟังก์ชัน ParDo แบบกำหนดเองที่เรียกว่า แยกซึ่งเป็นรูปแบบหนึ่งของการแปลงลำแสงสำหรับการประมวลผลแบบขนาน ใน Python สิ่งนี้ทำด้วยวิธีพิเศษ - เราต้องสร้างคลาสที่สืบทอดมาจากคลาส DoFn Beam ฟังก์ชัน Split นำสตริงที่แยกวิเคราะห์จากฟังก์ชันก่อนหน้าและส่งคืนรายการพจนานุกรมพร้อมคีย์ที่สอดคล้องกับชื่อคอลัมน์ในตาราง BigQuery ของเรา มีบางอย่างที่ควรทราบเกี่ยวกับฟังก์ชันนี้: ฉันต้องนำเข้า datetime ภายในฟังก์ชั่นเพื่อให้มันทำงานได้ ฉันได้รับข้อผิดพลาดในการนำเข้าที่จุดเริ่มต้นของไฟล์ ซึ่งเป็นเรื่องแปลก รายการนี้จะถูกส่งต่อไปยังฟังก์ชัน เขียนถึงBigQueryซึ่งเพียงแค่เพิ่มข้อมูลของเราลงในตาราง รหัสสำหรับ Batch DataFlow Job และ Streaming DataFlow Job ระบุไว้ด้านล่าง ข้อแตกต่างเพียงอย่างเดียวระหว่างรหัสชุดและสตรีมคือชุดที่เราอ่านจาก CSV src_pathการใช้ฟังก์ชัน ReadFromText จาก บีม.

Batch DataFlow Job (การประมวลผลเป็นชุด)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

งานการสตรีม DataFlow (การประมวลผลสตรีม)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

กำลังดำเนินการไปป์ไลน์

เราสามารถเริ่มไปป์ไลน์ได้หลายวิธี หากเราต้องการ เราก็สามารถเรียกใช้จากเทอร์มินัลในเครื่องได้โดยการลงชื่อเข้าใช้ GCP จากระยะไกล

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

อย่างไรก็ตาม เราจะเรียกใช้ด้วย DataFlow เราสามารถทำได้ด้วยคำสั่งด้านล่างโดยตั้งค่าพารามิเตอร์ที่จำเป็นต่อไปนี้

  • project - ID ของโครงการ GCP ของคุณ
  • runner เป็นไปป์ไลน์รันเนอร์ที่จะแยกวิเคราะห์โปรแกรมของคุณและสร้างไปป์ไลน์ของคุณ หากต้องการเรียกใช้ในระบบคลาวด์ คุณต้องระบุ DataflowRunner
  • staging_location เป็นเส้นทางไปยังที่เก็บข้อมูลบนคลาวด์ของ Cloud Dataflow สำหรับการจัดทำดัชนีแพ็คเกจรหัสที่จำเป็นโดยตัวจัดการที่ทำงาน
  • temp_location - เส้นทางไปยังที่เก็บข้อมูลบนคลาวด์ Cloud Dataflow สำหรับวางไฟล์งานชั่วคราวที่สร้างขึ้นระหว่างการทำงานของไปป์ไลน์
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

ขณะที่คำสั่งนี้ทำงานอยู่ เราสามารถไปที่แท็บ DataFlow ในคอนโซลของ Google และดูไปป์ไลน์ของเราได้ เมื่อคลิกที่ไปป์ไลน์ เราควรจะเห็นบางอย่างที่คล้ายกับรูปที่ 4 สำหรับจุดประสงค์ในการแก้ไขจุดบกพร่อง การไปที่บันทึกแล้วไปที่ Stackdriver เพื่อดูบันทึกโดยละเอียดจะมีประโยชน์มาก สิ่งนี้ช่วยให้ฉันแก้ปัญหาเกี่ยวกับไปป์ไลน์ได้ในหลายกรณี

เราสร้างไปป์ไลน์สำหรับการประมวลผลข้อมูลแบบสตรีม ส่วนที่ 2
รูปที่ 4: ท่อส่งลำแสง

การเข้าถึงข้อมูลของเราใน BigQuery

ดังนั้นเราควรมีไปป์ไลน์ที่ทำงานกับข้อมูลเข้ามาในตารางของเราแล้ว ในการทดสอบนี้ เราสามารถไปที่ BigQuery และดูข้อมูลได้ หลังจากใช้คำสั่งด้านล่าง คุณควรเห็นบรรทัดแรกของชุดข้อมูล ตอนนี้เรามีข้อมูลที่จัดเก็บไว้ใน BigQuery แล้ว เราสามารถทำการวิเคราะห์เพิ่มเติม รวมทั้งแชร์ข้อมูลกับเพื่อนร่วมงานและเริ่มตอบคำถามทางธุรกิจได้

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

เราสร้างไปป์ไลน์สำหรับการประมวลผลข้อมูลแบบสตรีม ส่วนที่ 2
รูปที่ 5: BigQuery

ข้อสรุป

เราหวังว่าโพสต์นี้จะเป็นตัวอย่างที่มีประโยชน์ในการสร้างไปป์ไลน์ข้อมูลการสตรีม ตลอดจนหาวิธีทำให้ข้อมูลเข้าถึงได้มากขึ้น การจัดเก็บข้อมูลในรูปแบบนี้ทำให้เรามีข้อดีหลายประการ ตอนนี้เราสามารถเริ่มตอบคำถามสำคัญ เช่น มีคนใช้ผลิตภัณฑ์ของเรากี่คน? ฐานผู้ใช้เติบโตขึ้นเมื่อเวลาผ่านไปหรือไม่? ผู้คนมีปฏิสัมพันธ์กับผลิตภัณฑ์ในแง่มุมใดมากที่สุด และมีข้อผิดพลาดที่ไม่ควรอยู่หรือไม่? คำถามเหล่านี้เป็นคำถามที่น่าสนใจสำหรับองค์กร จากข้อมูลเชิงลึกที่ได้จากคำตอบของคำถามเหล่านี้ เราจะสามารถปรับปรุงผลิตภัณฑ์และเพิ่มการมีส่วนร่วมของผู้ใช้ได้

Beam มีประโยชน์มากสำหรับการออกกำลังกายประเภทนี้ และยังมีกรณีการใช้งานอื่นๆ ที่น่าสนใจอีกมากมาย ตัวอย่างเช่น คุณสามารถวิเคราะห์ข้อมูลการขึ้นหุ้นแบบเรียลไทม์และทำการซื้อขายตามการวิเคราะห์ บางทีคุณอาจมีข้อมูลเซ็นเซอร์ที่มาจากยานพาหนะและต้องการคำนวณระดับการจราจร ตัวอย่างเช่น คุณสามารถเป็นบริษัทเกมที่รวบรวมข้อมูลผู้ใช้และใช้ข้อมูลนั้นเพื่อสร้างแดชบอร์ดเพื่อติดตามเมตริกหลัก โอเค สุภาพบุรุษ นี่คือหัวข้อสำหรับโพสต์อื่น ขอบคุณที่อ่าน และสำหรับผู้ที่ต้องการดูโค้ดแบบเต็ม ด้านล่างนี้คือลิงก์ไปยัง GitHub ของฉัน

https://github.com/DFoly/User_log_pipeline

นั่นคือทั้งหมดที่ อ่านส่วนแรก.

ที่มา: will.com

เพิ่มความคิดเห็น