ما یک خط لوله پردازش داده های جریانی ایجاد می کنیم. قسمت 2

سلام به همه. ترجمه قسمت پایانی مقاله را که به طور خاص برای دانشجویان دوره تهیه شده است را به اشتراک می گذاریم. مهندس داده. می توانید قسمت اول را بخوانید اینجا.

پرتو آپاچی و جریان داده برای خطوط لوله بلادرنگ

ما یک خط لوله پردازش داده های جریانی ایجاد می کنیم. قسمت 2

راه اندازی Google Cloud

توجه: من از Google Cloud Shell برای اجرای خط لوله و انتشار داده‌های گزارش سفارشی استفاده کردم زیرا در اجرای خط لوله در پایتون 3 با مشکل مواجه بودم. Google Cloud Shell از Python 2 استفاده می‌کند که با Apache Beam سازگارتر است.

برای شروع خط لوله، باید کمی تنظیمات را بررسی کنیم. برای آن دسته از شما که قبلاً از GCP استفاده نکرده‌اید، باید 6 مرحله زیر را که در این توضیح داده شده است دنبال کنید صفحه.

پس از این، ما باید اسکریپت های خود را در Google Cloud Storage آپلود کنیم و آنها را در Google Cloud Shel خود کپی کنیم. آپلود در فضای ذخیره سازی ابری کاملاً بی اهمیت است (توضیحات را می توان یافت اینجا). برای کپی کردن فایل‌هایمان، می‌توانیم Google Cloud Shel را از نوار ابزار با کلیک بر روی اولین نماد سمت چپ در شکل 2 زیر باز کنیم.

ما یک خط لوله پردازش داده های جریانی ایجاد می کنیم. قسمت 2
شکل 2

دستوراتی که برای کپی فایل ها و نصب کتابخانه های مورد نیاز نیاز داریم در زیر لیست شده است.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

ایجاد پایگاه داده و جدول ما

هنگامی که تمام مراحل مربوط به راه اندازی را کامل کردیم، کاری که باید انجام دهیم این است که یک مجموعه داده و جدول در BigQuery ایجاد کنیم. راه های مختلفی برای انجام این کار وجود دارد، اما ساده ترین آن استفاده از کنسول Google Cloud با ایجاد یک مجموعه داده است. می توانید مراحل زیر را دنبال کنید پیوندبرای ایجاد یک جدول با یک طرح. میز ما خواهد داشت 7 ستون، مربوط به اجزای هر گزارش کاربری است. برای راحتی، همه ستون ها را به جز متغیر timelocal به صورت رشته تعریف می کنیم و آنها را با توجه به متغیرهایی که قبلا ایجاد کردیم نام گذاری می کنیم. طرح جدول ما باید مانند شکل 3 باشد.

ما یک خط لوله پردازش داده های جریانی ایجاد می کنیم. قسمت 2
شکل 3. طرح جدول

انتشار داده های گزارش کاربری

Pub/Sub جزء حیاتی خط لوله ما است زیرا به چندین برنامه مستقل اجازه می دهد تا با یکدیگر ارتباط برقرار کنند. به طور خاص، به عنوان یک واسطه عمل می کند که به ما امکان می دهد پیام بین برنامه ها ارسال و دریافت کنیم. اولین کاری که باید انجام دهیم ایجاد یک موضوع است. به سادگی به Pub/Sub در کنسول بروید و روی CREATE TOPIC کلیک کنید.

کد زیر اسکریپت ما را فراخوانی می کند تا داده های گزارش تعریف شده در بالا را تولید کند و سپس گزارش ها را به Pub/Sub متصل و ارسال می کند. تنها کاری که باید انجام دهیم این است که یک شی بسازیم PublisherClient، مسیر موضوع را با استفاده از روش مشخص کنید topic_path و تابع را فراخوانی کنید publish с topic_path و داده ها لطفا توجه داشته باشید که ما وارد می کنیم generate_log_line از فیلمنامه ما stream_logs، بنابراین مطمئن شوید که این فایل ها در یک پوشه هستند، در غیر این صورت با خطای import مواجه می شوید. سپس می توانیم این را از طریق کنسول گوگل خود با استفاده از:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

همانطور که در شکل زیر نشان داده شده است، به محض اجرای فایل، می توانیم خروجی داده های لاگ به کنسول را مشاهده کنیم. این اسکریپت تا زمانی که ما استفاده نکنیم کار خواهد کرد CTRL + Cبرای تکمیل آن

ما یک خط لوله پردازش داده های جریانی ایجاد می کنیم. قسمت 2
شکل 4. خروجی publish_logs.py

نوشتن کد خط لوله ما

اکنون که همه چیز را آماده کرده ایم، می توانیم قسمت سرگرم کننده را شروع کنیم - کدگذاری خط لوله خود با استفاده از Beam و Python. برای ایجاد خط لوله Beam، باید یک آبجکت خط لوله (p) ایجاد کنیم. هنگامی که یک آبجکت خط لوله ایجاد کردیم، می توانیم چندین تابع را یکی پس از دیگری با استفاده از عملگر اعمال کنیم pipe (|). به طور کلی، گردش کار مانند تصویر زیر است.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

در کد ما دو تابع سفارشی ایجاد خواهیم کرد. تابع regex_clean، که داده ها را اسکن می کند و ردیف مربوطه را بر اساس لیست PATTERNS با استفاده از تابع بازیابی می کند. re.search. تابع یک رشته جدا شده با کاما برمی گرداند. اگر متخصص بیان منظم نیستید، توصیه می کنم این را بررسی کنید آموزش و در یک دفترچه یادداشت برای بررسی کدها تمرین کنید. پس از این یک تابع ParDo سفارشی به نام تعریف می کنیم شکاف، که یک تغییر از تبدیل پرتو برای پردازش موازی است. در پایتون، این کار به روش خاصی انجام می شود - ما باید کلاسی ایجاد کنیم که از کلاس DoFn Beam به ارث می رسد. تابع Split ردیف تجزیه شده را از تابع قبلی می گیرد و فهرستی از فرهنگ لغت را با کلیدهای مربوط به نام ستون ها در جدول BigQuery برمی گرداند. نکته ای در مورد این تابع وجود دارد که باید به آن توجه کرد: مجبور شدم وارد کنم datetime داخل یک تابع تا کار کند. من در ابتدای فایل با خطای import مواجه بودم که عجیب بود. سپس این لیست به تابع منتقل می شود WriteToBigQuery، که به سادگی داده های ما را به جدول اضافه می کند. کد Batch DataFlow Job و Streaming DataFlow Job در زیر آورده شده است. تنها تفاوت بین کد دسته ای و استریم این است که به صورت دسته ای ما CSV را از آن می خوانیم src_pathبا استفاده از تابع ReadFromText از پرتو.

Batch DataFlow Job (پردازش دسته ای)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

کار جریان داده جریان (پردازش جریان)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

راه اندازی نوار نقاله

ما می توانیم خط لوله را به روش های مختلف اجرا کنیم. اگر بخواهیم، ​​می‌توانیم آن را به صورت محلی از یک ترمینال و در حالی که از راه دور وارد GCP می‌شویم، اجرا کنیم.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

با این حال، ما قصد داریم آن را با استفاده از DataFlow اجرا کنیم. ما می توانیم این کار را با استفاده از دستور زیر با تنظیم پارامترهای مورد نیاز زیر انجام دهیم.

  • project - شناسه پروژه GCP شما.
  • runner اجرای خط لوله است که برنامه شما را تجزیه و تحلیل می کند و خط لوله شما را می سازد. برای اجرا در فضای ابری باید DataflowRunner را مشخص کنید.
  • staging_location - مسیر ذخیره سازی ابری Cloud Dataflow برای نمایه سازی بسته های کد مورد نیاز پردازنده هایی که کار را انجام می دهند.
  • temp_location - مسیر ذخیره سازی ابری Cloud Dataflow برای ذخیره فایل های شغلی موقت ایجاد شده در حین اجرای خط لوله.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

در حالی که این دستور اجرا می شود، می توانیم به تب DataFlow در کنسول گوگل برویم و خط لوله خود را مشاهده کنیم. وقتی روی خط لوله کلیک می کنیم، باید چیزی شبیه به شکل 4 ببینیم. برای اهداف اشکال زدایی، رفتن به Logs و سپس Stackdriver برای مشاهده گزارش های دقیق می تواند بسیار مفید باشد. این به من کمک کرد تا مشکلات مربوط به خط لوله را در تعدادی از موارد حل کنم.

ما یک خط لوله پردازش داده های جریانی ایجاد می کنیم. قسمت 2
شکل 4: نوار نقاله

به داده های ما در BigQuery دسترسی داشته باشید

بنابراین، ما از قبل باید خط لوله ای داشته باشیم که داده ها در جدول ما جاری است. برای آزمایش این، می‌توانیم به BigQuery برویم و به داده‌ها نگاه کنیم. پس از استفاده از دستور زیر باید چند ردیف اول مجموعه داده را ببینید. اکنون که داده‌های ذخیره شده در BigQuery را داریم، می‌توانیم تجزیه و تحلیل بیشتری انجام دهیم و همچنین داده‌ها را با همکاران به اشتراک بگذاریم و شروع به پاسخگویی به سؤالات تجاری کنیم.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

ما یک خط لوله پردازش داده های جریانی ایجاد می کنیم. قسمت 2
شکل 5: BigQuery

نتیجه

امیدواریم این پست به عنوان یک نمونه مفید برای ایجاد خط لوله داده های جریانی و همچنین یافتن راه هایی برای دسترسی بیشتر به داده ها باشد. ذخیره سازی داده ها در این فرمت مزایای زیادی به ما می دهد. اکنون می‌توانیم به سؤالات مهمی مانند چند نفر از محصول ما پاسخ دهیم؟ آیا پایگاه کاربران شما در طول زمان در حال افزایش است؟ افراد با چه جنبه های محصول بیشتر تعامل دارند؟ و آیا خطاهایی وجود دارد که نباید وجود داشته باشد؟ اینها سوالاتی هستند که مورد توجه سازمان خواهند بود. بر اساس بینش‌هایی که از پاسخ به این سؤالات به دست می‌آید، می‌توانیم محصول را بهبود بخشیم و تعامل کاربران را افزایش دهیم.

Beam واقعا برای این نوع تمرین مفید است و تعدادی موارد استفاده جالب دیگر نیز دارد. به عنوان مثال، ممکن است بخواهید داده‌های تیک سهام را در زمان واقعی تجزیه و تحلیل کنید و بر اساس تجزیه و تحلیل معاملات انجام دهید، شاید داده‌های حسگر از وسایل نقلیه دارید و می‌خواهید محاسبات سطح ترافیک را محاسبه کنید. برای مثال، می‌توانید یک شرکت بازی باشید که داده‌های کاربر را جمع‌آوری می‌کند و از آن برای ایجاد داشبورد برای ردیابی معیارهای کلیدی استفاده می‌کند. بسیار خوب، آقایان، این یک موضوع برای یک پست دیگر است، با تشکر از خواندن، و برای کسانی که می خواهند کد کامل را ببینند، در زیر لینک GitHub من است.

https://github.com/DFoly/User_log_pipeline

همه چیز هست قسمت اول را بخوانید.

منبع: www.habr.com

اضافه کردن نظر