我们创建一个流数据处理管道。 第2部分

大家好。 我们正在分享文章最后部分的翻译,这是专门为该课程的学生准备的。 数据工程师。 你可以阅读第一部分 这里.

用于实时管道的 Apache Beam 和 DataFlow

我们创建一个流数据处理管道。 第2部分

设置 Google 云

注意:我使用 Google Cloud Shell 来运行管道并发布自定义日志数据,因为我在 Python 3 中运行管道时遇到问题。Google Cloud Shell 使用 Python 2,它与 Apache Beam 更加一致。

要启动管道,我们需要深入了解一下设置。 对于之前未使用过 GCP 的用户,您需要按照本指南中概述的以下 6 个步骤进行操作 .

之后,我们需要将脚本上传到 Google Cloud Storage 并将它们复制到 Google Cloud Shel。 上传到云存储非常简单(可以找到说明 这里)。 要复制文件,我们可以通过单击下面图 2 中左侧的第一个图标从工具栏打开 Google Cloud Shel。

我们创建一个流数据处理管道。 第2部分
图2

下面列出了复制文件和安装所需库所需的命令。

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

创建我们的数据库和表

完成所有设置相关步骤后,接下来我们需要做的是在 BigQuery 中创建数据集和表。 有多种方法可以执行此操作,但最简单的方法是首先创建数据集来使用 Google Cloud 控制台。 您可以按照以下步骤操作 链接创建带有架构的表。 我们的桌子上会有 7 列,对应每个用户日志的组成部分。 为了方便起见,我们将除 timelocal 变量之外的所有列定义为字符串,并根据我们之前生成的变量命名它们。 我们的表的布局应如图 3 所示。

我们创建一个流数据处理管道。 第2部分
图 3. 表布局

发布用户日志数据

Pub/Sub 是我们管道的关键组件,因为它允许多个独立应用程序相互通信。 特别是,它充当中介,允许我们在应用程序之间发送和接收消息。 我们需要做的第一件事是创建一个主题。 只需转到控制台中的 Pub/Sub 并单击“创建主题”即可。

下面的代码调用我们的脚本来生成上面定义的日志数据,然后连接并将日志发送到 Pub/Sub。 我们唯一需要做的就是创建一个对象 发布者客户端,使用该方法指定主题的路径 topic_path 并调用该函数 publish с topic_path 和数据。 请注意,我们导入 generate_log_line 从我们的脚本 stream_logs,因此请确保这些文件位于同一文件夹中,否则您将收到导入错误。 然后我们可以使用以下命令通过我们的谷歌控制台运行它:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

文件一运行,我们就能在控制台看到日志数据的输出,如下图所示。 只要我们不使用这个脚本就会起作用 CTRL + C完成它。

我们创建一个流数据处理管道。 第2部分
图 4. 输出 publish_logs.py

编写我们的管道代码

现在我们已经准备好了一切,我们可以开始有趣的部分 - 使用 Beam 和 Python 编码我们的管道。 要创建 Beam 管道,我们需要创建一个管道对象 (p)。 创建管道对象后,我们可以使用运算符依次应用多个函数 pipe (|)。 一般来说,工作流程如下图所示。

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

在我们的代码中,我们将创建两个自定义函数。 功能 regex_clean,它使用函数扫描数据并根据 PATTERNS 列表检索相应的行 re.search。 该函数返回一个逗号分隔的字符串。 如果您不是正则表达式专家,我建议您查看此内容 教程 并在记事本中练习检查代码。 之后我们定义一个名为的自定义 ParDo 函数 分裂,这是用于并行处理的 Beam 变换的一种变体。 在 Python 中,这是通过一种特殊的方式完成的 - 我们必须创建一个继承自 DoFn Beam 类的类。 Split 函数从上一个函数中获取解析后的行,并返回一个字典列表,其中的键与 BigQuery 表中的列名称相对应。 关于这个函数有一点需要注意:我必须导入 datetime 在函数内部使其工作。 我在文件开头收到导入错误,这很奇怪。 然后将该列表传递给函数 写入大查询,它只是将我们的数据添加到表中。 下面给出了 Batch DataFlow Job 和 Streaming DataFlow Job 的代码。 批处理和流式代码之间的唯一区别是,我们在批处理中读取 CSV src_path使用功能 ReadFromText 来自梁。

批处理数据流作业(批处理)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

流数据流作业(流处理)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

启动输送机

我们可以通过几种不同的方式运行管道。 如果我们愿意,我们可以在远程登录 GCP 的同时从终端本地运行它。

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

但是,我们将使用 DataFlow 来运行它。 我们可以使用以下命令通过设置以下必需参数来完成此操作。

  • project — 您的 GCP 项目的 ID。
  • runner 是一个管道运行程序,它将分析您的程序并构建管道。 要在云中运行,您必须指定 DataflowRunner。
  • staging_location — Cloud Dataflow 云存储的路径,用于索引执行工作的处理器所需的代码包。
  • temp_location — Cloud Dataflow 云存储的路径,用于存储管道运行时创建的临时作业文件。
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

当此命令运行时,我们可以转到 google 控制台中的 DataFlow 选项卡并查看我们的管道。 当我们单击管道时,我们应该看到类似于图 4 的内容。出于调试目的,转到日志,然后转到 Stackdriver 查看详细日志会非常有帮助。 这帮助我解决了许多案例中的管道问题。

我们创建一个流数据处理管道。 第2部分
图 4:梁式输送机

在 BigQuery 中访问我们的数据

因此,我们应该已经有一个正在运行的管道,数据流入我们的表。 为了测试这一点,我们可以转到 BigQuery 并查看数据。 使用下面的命令后,您应该看到数据集的前几行。 现在我们已经将数据存储在 BigQuery 中,我们可以进行进一步的分析,以及与同事共享数据并开始回答业务问题。

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

我们创建一个流数据处理管道。 第2部分
图 5:BigQuery

结论

我们希望这篇文章能够成为创建流数据管道以及寻找使数据更易于访问的方法的有用示例。 以这种格式存储数据给我们带来了很多优势。 现在我们可以开始回答重要的问题,例如有多少人使用我们的产品? 您的用户群是否随着时间的推移而增长? 人们与产品的哪些方面互动最多? 是否存在不应该出现的错误? 这些是组织感兴趣的问题。 根据这些问题的答案得出的见解,我们可以改进产品并提高用户参与度。

Beam 对于此类练习非常有用,并且还有许多其他有趣的用例。 例如,您可能想要实时分析股票价格数据并根据分析进行交易,也许您有来自车辆的传感器数据并想要计算流量水平。 例如,您也可以是一家收集用户数据并使用其创建仪表板来跟踪关键指标的游戏公司。 好的,先生们,这是另一篇文章的主题,感谢您的阅读,对于那些想查看完整代码的人,下面是我的 GitHub 的链接。

https://github.com/DFoly/User_log_pipeline

就是这样。 阅读第一部分.

来源: habr.com

添加评论