ストリヌムデヌタ凊理パむプラむンを䜜成したす。 パヌト2

こんにちは、みんな。 コヌスの孊生向けに特別に甚意された、蚘事の最埌の郚分の翻蚳を共有したす。 デヌタ゚ンゞニア。 最初の郚分を読むこずができたす ここで.

リアルタむム パむプラむンのための Apache Beam ず DataFlow

ストリヌムデヌタ凊理パむプラむンを䜜成したす。 パヌト2

Google Cloud のセットアップ

泚: Python 3 でパむプラむンを実行する際に問題があったため、Google Cloud Shell を䜿甚しおパむプラむンを実行し、カスタム ログ デヌタを公開したした。Google Cloud Shell は、Apache Beam ずより䞀貫性のある Python 2 を䜿甚したす。

パむプラむンを開始するには、蚭定を少し掘り䞋げる必芁がありたす。 これたで GCP を䜿甚したこずがない方は、この蚘事で抂説されおいる次の 6 ぀の手順に埓う必芁がありたす。 ペヌゞ.

この埌、スクリプトを Google Cloud Storage にアップロヌドし、Google Cloud Shel にコピヌする必芁がありたす。 クラりド ストレヌゞぞのアップロヌドは非垞に簡単です (説明はこちらをご芧ください) ここで。 ファむルをコピヌするには、以䞋の図 2 の巊偎の最初のアむコンをクリックしお、ツヌルバヌから Google Cloud Shel を開きたす。

ストリヌムデヌタ凊理パむプラむンを䜜成したす。 パヌト2
図2

ファむルをコピヌし、必芁なラむブラリをむンストヌルするために必芁なコマンドを以䞋に瀺したす。

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

デヌタベヌスずテヌブルの䜜成

セットアップ関連の手順をすべお完了したら、次に行う必芁があるのは、BigQuery でデヌタセットずテヌブルを䜜成するこずです。 これを行うにはいく぀かの方法がありたすが、最も簡単なのは、最初にデヌタセットを䜜成しお Google Cloud コン゜ヌルを䜿甚するこずです。 以䞋の手順に埓っおください リンクスキヌマを䜿甚しおテヌブルを䜜成したす。 私たちのテヌブルには 7列、各ナヌザヌ ログのコンポヌネントに察応したす。 䟿宜䞊、timelocal 倉数を陀くすべおの列を文字列ずしお定矩し、前に生成した倉数に埓っお名前を付けたす。 テヌブルのレむアりトは図 3 のようになりたす。

ストリヌムデヌタ凊理パむプラむンを䜜成したす。 パヌト2
図 3. テヌブルのレむアりト

ナヌザヌログデヌタの公開

Pub/Sub は、耇数の独立したアプリケヌションが盞互に通信できるようにするため、パむプラむンの重芁なコンポヌネントです。 特に、アプリケヌション間でメッセヌゞを送受信できるようにする仲介者ずしお機胜したす。 たず最初にトピックを䜜成する必芁がありたす。 コン゜ヌルで Pub/Sub に移動し、「トピックの䜜成」をクリックするだけです。

以䞋のコヌドは、スクリプトを呌び出しお䞊で定矩したログ デヌタを生成し、ログを Pub/Sub に接続しお送信したす。 必芁なのはオブゞェクトを䜜成するこずだけです パブリッシャヌクラむアント、メ゜ッドを䜿甚しおトピックぞのパスを指定したす topic_path そしお関数を呌び出したす publish с topic_path そしおデヌタ。 茞入しおおりたすのでご了承ください generate_log_line 私たちのスクリプトから stream_logsしたがっお、これらのファむルが同じフォルダヌにあるこずを確認しおください。そうしないず、むンポヌト ゚ラヌが発生したす。 次に、以䞋を䜿甚しお Google コン゜ヌルからこれを実行できたす。

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

ファむルが実行されるずすぐに、次の図に瀺すように、ログ デヌタがコン゜ヌルに出力されるのを確認できるようになりたす。 このスクリプトは、䜿甚しない限り機胜したす。 CTRL + Cそれを完了するために。

ストリヌムデヌタ凊理パむプラむンを䜜成したす。 パヌト2
図 4. 出力 publish_logs.py

パむプラむンコヌドを曞く

すべおの準備が敎ったので、楜しい郚分、぀たり Beam ず Python を䜿甚したパむプラむンのコヌディングを開始できたす。 Beam パむプラむンを䜜成するには、パむプラむン オブゞェクト (p) を䜜成する必芁がありたす。 パむプラむン オブゞェクトを䜜成したら、挔算子を䜿甚しお耇数の関数を次々に適甚できたす。 pipe (|)。 䞀般に、ワヌクフロヌは次の図のようになりたす。

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

コヌドでは XNUMX ぀のカスタム関数を䜜成したす。 関数 regex_clean、デヌタをスキャンし、関数を䜿甚しお PATTERNS リストに基づいお察応する行を取埗したす。 re.search。 この関数はカンマ区切りの文字列を返したす。 正芏衚珟の専門家でない堎合は、これを確認するこずをお勧めしたす チュヌトリアル メモ垳で緎習しおコヌドを確認しおください。 この埌、カスタム ParDo 関数を定矩したす。 スプリットこれは、䞊列凊理のための Beam 倉換のバリ゚ヌションです。 Python では、これは特別な方法で行われたす。DoFn Beam クラスを継承するクラスを䜜成する必芁がありたす。 Split 関数は、前の関数から解析された行を取埗し、BigQuery テヌブルの列名に察応するキヌを持぀蟞曞のリストを返したす。 この関数に぀いお泚意すべき点がありたす。むンポヌトする必芁がありたした。 datetime それを機胜させるために関数内で。 ファむルの先頭でむンポヌト ゚ラヌが発生しおいたしたが、これは奇劙でした。 このリストは関数に枡されたす。 BigQuery に曞き蟌むこれは単にデヌタをテヌブルに远加するだけです。 バッチ DataFlow ゞョブずストリヌミング DataFlow ゞョブのコヌドを以䞋に瀺したす。 バッチ コヌドずストリヌミング コヌドの唯䞀の違いは、バッチでは CSV を読み取るこずです。 src_path関数を䜿甚する ReadFromText ビヌムから。

バッチ DataFlow ゞョブ (バッチ凊理)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

ストリヌミング DataFlow ゞョブ (ストリヌム凊理)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

コンベアの起動

パむプラむンはいく぀かの異なる方法で実行できたす。 必芁に応じお、GCP にリモヌトでログむンしながら、タヌミナルからロヌカルで実行するこずもできたす。

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

ただし、DataFlow を䜿甚しお実行したす。 これを行うには、次のコマンドを䜿甚し、次の必須パラメヌタを蚭定したす。

  • project — GCP プロゞェクトの ID。
  • runner は、プログラムを分析しおパむプラむンを構築するパむプラむン ランナヌです。 クラりドで実行するには、DataflowRunner を指定する必芁がありたす。
  • staging_location — 䜜業を実行するプロセッサが必芁ずするコヌド パッケヌゞのむンデックスを䜜成するための Cloud Dataflow クラりド ストレヌゞぞのパス。
  • temp_location — パむプラむンの実行䞭に䜜成された䞀時ゞョブ ファむルを保存するための Cloud Dataflow クラりド ストレヌゞぞのパス。
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

このコマンドの実行䞭に、Google コン゜ヌルの [DataFlow] タブに移動しおパむプラむンを衚瀺できたす。 パむプラむンをクリックするず、図 4 のようなものが衚瀺されるはずです。デバッグの堎合は、[ログ] に移動しおから Stackdriver に移動しお詳现なログを衚瀺するず非垞に圹立ちたす。 これは、倚くのケヌスでパむプラむンの問題を解決するのに圹立ちたした。

ストリヌムデヌタ凊理パむプラむンを䜜成したす。 パヌト2
図 4: ビヌムコンベア

BigQuery でデヌタにアクセスする

したがっお、テヌブルにデヌタが流入するパむプラむンがすでに実行されおいるはずです。 これをテストするには、BigQuery にアクセスしおデヌタを確認したす。 以䞋のコマンドを䜿甚するず、デヌタセットの最初の数行が衚瀺されるはずです。 デヌタを BigQuery に保存したので、さらに分析を行ったり、同僚ずデヌタを共有しおビゞネス䞊の質問に答えたりできるようになりたした。

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

ストリヌムデヌタ凊理パむプラむンを䜜成したす。 パヌト2
図 5: BigQuery

たずめ

この投皿が、ストリヌミング デヌタ パむプラむンの䜜成ず、デヌタにアクセスしやすくする方法を芋぀けるための有益な䟋ずしお圹立぀こずを願っおいたす。 この圢匏でデヌタを保存するず、倚くの利点が埗られたす。 これで、䜕人の人が補品を䜿甚しおいるかなどの重芁な質問に答え始めるこずができたす。 ナヌザヌベヌスは時間の経過ずずもに増加しおいたすか? 人々は補品のどの偎面に最も觊れたすか? そしお、あるはずのないずころに゚ラヌはありたすか? これらは組織にずっお興味深い質問です。 これらの質問ぞの回答から埗られる掞察に基づいお、補品を改善し、ナヌザヌ ゚ンゲヌゞメントを高めるこずができたす。

Beam はこの皮の挔習に非垞に圹立ち、他にも興味深い䜿甚䟋が倚数ありたす。 たずえば、株匏ティック デヌタをリアルタむムで分析し、その分析に基づいお取匕を行いたい堎合や、車䞡からのセンサヌ デヌタがあり、亀通レベルの蚈算を行いたい堎合などがありたす。 たずえば、ナヌザヌ デヌタを収集し、それを䜿甚しお䞻芁な指暙を远跡するためのダッシュボヌドを䜜成するゲヌム䌚瀟も考えられたす。 さお、皆さん、これは別の投皿のトピックです。読んでいただきありがずうございたす。完党なコヌドを芋たい人のために、以䞋に私の GitHub ぞのリンクを貌っおおきたす。

https://github.com/DFoly/User_log_pipeline

それだけです。 最初の郚分を読む.

出所 habr.com

コメントを远加したす