Chúng tôi tạo ra một đường ống xử lý dữ liệu luồng. Phần 2

Chào mọi người. Chúng tôi chia sẻ bản dịch phần cuối cùng của bài viết, được chuẩn bị riêng cho học viên của khóa học. Kỹ sư dữ liệu. Bạn có thể đọc phần đầu tiên đây.

Apache Beam và DataFlow cho đường ống thời gian thực

Chúng tôi tạo ra một đường ống xử lý dữ liệu luồng. Phần 2

Thiết lập Google Cloud

Lưu ý: Tôi đã sử dụng Google Cloud Shell để chạy quy trình và xuất bản dữ liệu nhật ký tùy chỉnh vì tôi gặp sự cố khi chạy quy trình trong Python 3. Google Cloud Shell sử dụng Python 2, phù hợp hơn với Apache Beam.

Để bắt đầu quy trình, chúng ta cần tìm hiểu sâu một chút về cài đặt. Đối với những người chưa sử dụng GCP trước đây, bạn sẽ cần làm theo 6 bước sau được nêu trong phần này trang.

Sau này, chúng tôi sẽ cần tải tập lệnh của mình lên Google Cloud Storage và sao chép chúng vào Google Cloud Shel. Việc tải lên bộ lưu trữ đám mây khá đơn giản (có thể tìm thấy mô tả đây). Để sao chép các tệp của mình, chúng ta có thể mở Google Cloud Shel từ thanh công cụ bằng cách nhấp vào biểu tượng đầu tiên bên trái trong Hình 2 bên dưới.

Chúng tôi tạo ra một đường ống xử lý dữ liệu luồng. Phần 2
Hình 2

Các lệnh chúng ta cần để sao chép các tệp và cài đặt các thư viện cần thiết được liệt kê bên dưới.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Tạo cơ sở dữ liệu và bảng của chúng tôi

Khi chúng tôi đã hoàn thành tất cả các bước liên quan đến thiết lập, điều tiếp theo chúng tôi cần làm là tạo tập dữ liệu và bảng trong BigQuery. Có một số cách để thực hiện việc này nhưng cách đơn giản nhất là sử dụng bảng điều khiển Google Cloud bằng cách tạo tập dữ liệu trước tiên. Bạn có thể làm theo các bước dưới đây liên kếtđể tạo một bảng có lược đồ. Bàn của chúng ta sẽ có 7 cột, tương ứng với các thành phần của từng nhật ký người dùng. Để thuận tiện, chúng tôi sẽ xác định tất cả các cột dưới dạng chuỗi, ngoại trừ biến timelocal và đặt tên chúng theo các biến chúng tôi đã tạo trước đó. Bố cục bảng của chúng ta sẽ giống như trong Hình 3.

Chúng tôi tạo ra một đường ống xử lý dữ liệu luồng. Phần 2
Hình 3. Bố cục bảng

Xuất bản dữ liệu nhật ký người dùng

Pub/Sub là một thành phần quan trọng trong quy trình của chúng tôi vì nó cho phép nhiều ứng dụng độc lập giao tiếp với nhau. Đặc biệt, nó hoạt động như một trung gian cho phép chúng ta gửi và nhận tin nhắn giữa các ứng dụng. Việc đầu tiên chúng ta cần làm là tạo một chủ đề. Chỉ cần truy cập Pub/Sub trong bảng điều khiển và nhấp vào TẠO CHỦ ĐỀ.

Đoạn mã bên dưới gọi tập lệnh của chúng tôi để tạo dữ liệu nhật ký được xác định ở trên, sau đó kết nối và gửi nhật ký đến Pub/Sub. Điều duy nhất chúng ta cần làm là tạo một đối tượng Nhà xuất bảnKhách hàng, chỉ định đường dẫn đến chủ đề bằng phương thức topic_path và gọi hàm publish с topic_path và dữ liệu. Xin lưu ý rằng chúng tôi nhập khẩu generate_log_line từ kịch bản của chúng tôi stream_logs, vì vậy hãy đảm bảo các tệp này nằm trong cùng một thư mục, nếu không bạn sẽ gặp lỗi nhập. Sau đó, chúng tôi có thể chạy nó thông qua bảng điều khiển Google bằng cách sử dụng:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Ngay khi tệp chạy, chúng ta sẽ có thể thấy đầu ra của dữ liệu nhật ký ra bảng điều khiển, như trong hình bên dưới. Tập lệnh này sẽ hoạt động miễn là chúng ta không sử dụng CTRL + Cđể hoàn thành nó.

Chúng tôi tạo ra một đường ống xử lý dữ liệu luồng. Phần 2
Hình 4. Đầu ra publish_logs.py

Viết mã đường dẫn của chúng tôi

Bây giờ chúng ta đã chuẩn bị xong mọi thứ, chúng ta có thể bắt đầu phần thú vị - mã hóa quy trình của mình bằng Beam và Python. Để tạo một đường ống Beam, chúng ta cần tạo một đối tượng đường ống (p). Khi chúng ta đã tạo một đối tượng đường ống, chúng ta có thể áp dụng lần lượt nhiều hàm bằng cách sử dụng toán tử pipe (|). Nói chung, quy trình làm việc trông giống như hình ảnh bên dưới.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Trong mã của chúng tôi, chúng tôi sẽ tạo hai hàm tùy chỉnh. Chức năng regex_clean, quét dữ liệu và lấy hàng tương ứng dựa trên danh sách PATTERNS bằng hàm re.search. Hàm trả về một chuỗi được phân tách bằng dấu phẩy. Nếu bạn không phải là chuyên gia về biểu thức chính quy, tôi khuyên bạn nên kiểm tra điều này hướng dẫn và thực hành trên notepad để kiểm tra mã. Sau này, chúng tôi xác định một hàm ParDo tùy chỉnh được gọi là chẻ, đây là một biến thể của biến đổi Beam để xử lý song song. Trong Python, việc này được thực hiện theo cách đặc biệt - chúng ta phải tạo một lớp kế thừa từ lớp DoFn Beam. Hàm Split lấy hàng được phân tích cú pháp từ hàm trước đó và trả về danh sách từ điển có khóa tương ứng với tên cột trong bảng BigQuery của chúng tôi. Có một điều cần lưu ý về chức năng này: Tôi đã phải nhập datetime bên trong một hàm để làm cho nó hoạt động. Tôi gặp phải lỗi nhập ở đầu tệp, điều này thật kỳ lạ. Danh sách này sau đó được chuyển đến hàm WriteToBigQuery, việc này chỉ đơn giản là thêm dữ liệu của chúng tôi vào bảng. Mã cho Công việc DataFlow hàng loạt và Công việc truyền dữ liệu luồng dữ liệu được đưa ra bên dưới. Sự khác biệt duy nhất giữa mã hàng loạt và mã phát trực tuyến là chúng tôi đọc CSV từ hàng loạt src_pathsử dụng chức năng ReadFromText từ Beam.

Công việc DataFlow hàng loạt (xử lý hàng loạt)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Truyền phát công việc DataFlow (xử lý luồng)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Khởi động băng tải

Chúng ta có thể chạy đường ống theo nhiều cách khác nhau. Nếu muốn, chúng tôi có thể chạy nó cục bộ từ một thiết bị đầu cuối trong khi đăng nhập vào GCP từ xa.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Tuy nhiên, chúng tôi sẽ chạy nó bằng DataFlow. Chúng ta có thể thực hiện việc này bằng lệnh bên dưới bằng cách đặt các tham số bắt buộc sau.

  • project — ID của dự án GCP của bạn.
  • runner là người chạy quy trình sẽ phân tích chương trình của bạn và xây dựng quy trình của bạn. Để chạy trên đám mây, bạn phải chỉ định DataflowRunner.
  • staging_location — đường dẫn đến bộ lưu trữ đám mây Cloud Dataflow để lập chỉ mục các gói mã cần thiết cho bộ xử lý thực hiện công việc.
  • temp_location — đường dẫn đến bộ lưu trữ đám mây Cloud Dataflow để lưu trữ các tệp công việc tạm thời được tạo trong khi quy trình đang chạy.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Trong khi lệnh này đang chạy, chúng ta có thể chuyển đến tab DataFlow trong bảng điều khiển của Google và xem quy trình của chúng tôi. Khi nhấp vào quy trình, chúng ta sẽ thấy nội dung tương tự như Hình 4. Đối với mục đích gỡ lỗi, có thể rất hữu ích nếu đi tới Nhật ký và sau đó đến Stackdriver để xem nhật ký chi tiết. Điều này đã giúp tôi giải quyết các vấn đề về đường ống trong một số trường hợp.

Chúng tôi tạo ra một đường ống xử lý dữ liệu luồng. Phần 2
Hình 4: Băng tải dầm

Truy cập dữ liệu của chúng tôi trong BigQuery

Vì vậy, chúng ta đã có sẵn một đường dẫn chạy với dữ liệu chảy vào bảng của mình. Để kiểm tra điều này, chúng ta có thể truy cập BigQuery và xem dữ liệu. Sau khi sử dụng lệnh bên dưới, bạn sẽ thấy một vài hàng đầu tiên của tập dữ liệu. Giờ đây, khi đã lưu trữ dữ liệu trong BigQuery, chúng tôi có thể tiến hành phân tích sâu hơn cũng như chia sẻ dữ liệu với đồng nghiệp và bắt đầu trả lời các câu hỏi kinh doanh.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Chúng tôi tạo ra một đường ống xử lý dữ liệu luồng. Phần 2
Hình 5: BigQuery

Kết luận

Chúng tôi hy vọng bài đăng này đóng vai trò là một ví dụ hữu ích về việc tạo đường dẫn dữ liệu truyền phát cũng như tìm cách làm cho dữ liệu dễ truy cập hơn. Việc lưu trữ dữ liệu ở định dạng này mang lại cho chúng ta nhiều lợi ích. Bây giờ chúng ta có thể bắt đầu trả lời các câu hỏi quan trọng như có bao nhiêu người sử dụng sản phẩm của chúng ta? Cơ sở người dùng của bạn có tăng theo thời gian không? Mọi người tương tác với khía cạnh nào của sản phẩm nhiều nhất? Và có những lỗi nào không nên có? Đây là những câu hỏi sẽ được tổ chức quan tâm. Dựa trên những hiểu biết sâu sắc thu được từ câu trả lời cho những câu hỏi này, chúng tôi có thể cải thiện sản phẩm và tăng mức độ tương tác của người dùng.

Beam thực sự hữu ích cho loại bài tập này và còn có một số trường hợp sử dụng thú vị khác. Ví dụ: bạn có thể muốn phân tích dữ liệu về giá cổ phiếu trong thời gian thực và thực hiện giao dịch dựa trên phân tích đó, có thể bạn có dữ liệu cảm biến đến từ các phương tiện và muốn tính toán mức độ lưu thông. Ví dụ: bạn cũng có thể là một công ty trò chơi thu thập dữ liệu người dùng và sử dụng dữ liệu đó để tạo trang tổng quan nhằm theo dõi các số liệu chính. Được rồi, các quý ông, đây là chủ đề cho một bài đăng khác, cảm ơn vì đã đọc và đối với những ai muốn xem mã đầy đủ, bên dưới là liên kết tới GitHub của tôi.

https://github.com/DFoly/User_log_pipeline

Đó là tất cả. Đọc phần một.

Nguồn: www.habr.com

Thêm một lời nhận xét