我們創建一個串流資料處理管道。 第2部分

大家好。 我們正在分享文章最後部分的翻譯,這是專門為該課程的學生準備的。 數據工程師。 你可以閱讀第一部分 這裡.

用於即時管道的 Apache Beam 和 DataFlow

我們創建一個串流資料處理管道。 第2部分

設定 Google 雲

注意:我使用 Google Cloud Shell 來運行管道並發布自訂日誌數據,因為我在 Python 3 中運行管道時遇到問題。Google Cloud Shell 使用 Python 2,它與 Apache Beam 更加一致。

要啟動管道,我們需要深入了解設定。 對於先前未使用過 GCP 的用戶,您需要按照本指南中概述的以下 6 個步驟進行操作 .

之後,我們需要將腳本上傳到 Google Cloud Storage 並將它們複製到 Google Cloud Shel。 上傳到雲端儲存非常簡單(可以找到說明 這裡)。 要複製文件,我們可以透過點擊下面圖 2 中左側的第一個圖示從工具列開啟 Google Cloud Shel。

我們創建一個串流資料處理管道。 第2部分
圖2

下面列出了複製檔案和安裝所需庫所需的命令。

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

建立我們的資料庫和表

完成所有設定相關步驟後,接下來我們需要做的是在 BigQuery 中建立資料集和表格。 有多種方法可以執行此操作,但最簡單的方法是先建立資料集來使用 Google Cloud 控制台。 您可以按照以下步驟操作 鏈接建立帶有架構的表。 我們的桌子上會有 7 列,對應每個使用者日誌的組成部分。 為了方便起見,我們將 timelocal 變數以外的所有列定義為字串,並根據我們先前產生的變數命名它們。 我們的表格的佈局應如圖 3 所示。

我們創建一個串流資料處理管道。 第2部分
圖 3. 表格佈局

發布使用者日誌數據

Pub/Sub 是我們管道的關鍵元件,因為它允許多個獨立應用程式相互通訊。 特別是,它充當中介,允許我們在應用程式之間發送和接收訊息。 我們需要做的第一件事是創建一個主題。 只需轉到控制台中的 Pub/Sub 並點擊“建立主題”即可。

下面的程式碼呼叫我們的腳本來產生上面定義的日誌數據,然後連接並將日誌傳送到 Pub/Sub。 我們唯一需要做的就是創建一個對象 發布者客戶端,使用該方法指定主題的路徑 topic_path 並呼叫該函數 publish с topic_path 和數據。 請注意,我們導入 generate_log_line 從我們的腳本 stream_logs,因此請確保這些文件位於同一資料夾中,否則您將收到匯入錯誤。 然後我們可以使用以下命令透過我們的Google控制台運行它:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

文件一運行,我們就能在控制台看到日誌資料的輸出,如下圖所示。 只要我們不使用這個腳本就會起作用 CTRL + C完成它。

我們創建一個串流資料處理管道。 第2部分
圖 4. 輸出 publish_logs.py

編寫我們的管道程式碼

現在我們已經準備好了一切,我們可以開始有趣的部分 - 使用 Beam 和 Python 編碼我們的管道。 要建立 Beam 管道,我們需要建立一個管道物件 (p)。 建立管道物件後,我們可以使用運算子依序套用多個函數 pipe (|)。 一般來說,工作流程如下圖。

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

在我們的程式碼中,我們將建立兩個自訂函數。 功能 regex_clean,它使用函數掃描資料並根據 PATTERNS 列表檢索相應的行 re.search。 該函數傳回一個逗號分隔的字串。 如果您不是正規表示式專家,我建議您查看此內容 教學 並在記事本中練習檢查程式碼。 之後我們定義一個名為的自訂 ParDo 函數 分裂,這是用於並行處理的 Beam 變換的一種變體。 在 Python 中,這是透過一種特殊的方式完成的 - 我們必須建立一個繼承自 DoFn Beam 類別的類別。 Split 函數從上一個函數中取得解析後的行,並傳回一個字典列表,其中的鍵與 BigQuery 表中的列名稱相對應。 關於這個函數有一點要注意:我必須導入 datetime 在函數內部使其工作。 我在文件開頭收到導入錯誤,這很奇怪。 然後將該列表傳遞給函數 寫入大查詢,它只是將我們的數據添加到表中。 下面給出了 Batch DataFlow Job 和 Streaming DataFlow Job 的程式碼。 批次和串流程式碼之間的唯一區別是,我們在批次中讀取 CSV src_path使用該功能 ReadFromText 來自梁。

批次資料流作業(批次)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

流資料流作業(流處理)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

啟動輸送機

我們可以透過幾種不同的方式來運行管道。 如果我們願意,我們可以在遠端登入 GCP 的同時從終端本地運行它。

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

但是,我們將使用 DataFlow 來運行它。 我們可以使用以下命令透過設定以下必需參數來完成此操作。

  • project — 您的 GCP 項目的 ID。
  • runner 是一個管道運行程序,它將分析您的程序並建立管道。 要在雲端中運行,您必須指定 DataflowRunner。
  • staging_location — Cloud Dataflow 雲端儲存的路徑,用於索引執行工作的處理器所需的程式碼包。
  • temp_location — Cloud Dataflow 雲端儲存的路徑,用於儲存管道執行時間所建立的暫存作業檔案。
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

當此命令運行時,我們可以轉到 google 控制台中的 DataFlow 選項卡並查看我們的管道。 當我們單擊管道時,我們應該看到類似於圖 4 的內容。出於調試目的,請轉到日誌,然後前往 Stackdriver 查看詳細日誌會非常有幫助。 這幫助我解決了許多案例中的管道問題。

我們創建一個串流資料處理管道。 第2部分
圖 4:樑式輸送機

在 BigQuery 中存取我們的數​​據

因此,我們應該已經有一個正在運作的管道,資料流入我們的表格。 為了測試這一點,我們可以前往 BigQuery 並查看資料。 使用下面的命令後,您應該會看到資料集的前幾行。 現在我們已經將資料儲存在 BigQuery 中,我們可以進行進一步的分析,以及與同事分享資料並開始回答業務問題。

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

我們創建一個串流資料處理管道。 第2部分
圖 5:BigQuery

結論

我們希望這篇文章能夠成為創建流資料管道以及尋找使資料更易於存取的方法的有用範例。 以這種格式儲存資料為我們帶來了很多優勢。 現在我們可以開始回答重要的問題,例如有多少人使用我們的產品? 您的用戶群是否隨著時間的推移而成長? 人們與產品的哪些面向互動最多? 是否存在不該出現的錯誤? 這些是組織感興趣的問題。 根據這些問題的答案得出的見解,我們可以改進產品並提高用戶參與度。

Beam 對於此類練習非常有用,並且還有許多其他有趣的用例。 例如,您可能想要即時分析股票價格數據並根據分析進行交易,也許您有來自車輛的傳感器數據並想要計算流量水平。 例如,您也可以是一家收集用戶資料並使用其創建儀表板來追蹤關鍵指標的遊戲公司。 好的,先生們,這是另一篇文章的主題,感謝您的閱讀,對於那些想查看完整程式碼的人,以下是我的 GitHub 的連結。

https://github.com/DFoly/User_log_pipeline

這就是全部。 閱讀第一部分.

來源: www.habr.com

添加評論