سلام به همه. ترجمه قسمت پایانی مقاله را که به طور خاص برای دانشجویان دوره تهیه شده است را به اشتراک می گذاریم.
پرتو آپاچی و جریان داده برای خطوط لوله بلادرنگ
راه اندازی Google Cloud
توجه: من از Google Cloud Shell برای اجرای خط لوله و انتشار دادههای گزارش سفارشی استفاده کردم زیرا در اجرای خط لوله در پایتون 3 با مشکل مواجه بودم. Google Cloud Shell از Python 2 استفاده میکند که با Apache Beam سازگارتر است.
برای شروع خط لوله، باید کمی تنظیمات را بررسی کنیم. برای آن دسته از شما که قبلاً از GCP استفاده نکردهاید، باید 6 مرحله زیر را که در این توضیح داده شده است دنبال کنید
پس از این، ما باید اسکریپت های خود را در Google Cloud Storage آپلود کنیم و آنها را در Google Cloud Shel خود کپی کنیم. آپلود در فضای ذخیره سازی ابری کاملاً بی اهمیت است (توضیحات را می توان یافت
شکل 2
دستوراتی که برای کپی فایل ها و نصب کتابخانه های مورد نیاز نیاز داریم در زیر لیست شده است.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
ایجاد پایگاه داده و جدول ما
هنگامی که تمام مراحل مربوط به راه اندازی را کامل کردیم، کاری که باید انجام دهیم این است که یک مجموعه داده و جدول در BigQuery ایجاد کنیم. راه های مختلفی برای انجام این کار وجود دارد، اما ساده ترین آن استفاده از کنسول Google Cloud با ایجاد یک مجموعه داده است. می توانید مراحل زیر را دنبال کنید
شکل 3. طرح جدول
انتشار داده های گزارش کاربری
Pub/Sub جزء حیاتی خط لوله ما است زیرا به چندین برنامه مستقل اجازه می دهد تا با یکدیگر ارتباط برقرار کنند. به طور خاص، به عنوان یک واسطه عمل می کند که به ما امکان می دهد پیام بین برنامه ها ارسال و دریافت کنیم. اولین کاری که باید انجام دهیم ایجاد یک موضوع است. به سادگی به Pub/Sub در کنسول بروید و روی CREATE TOPIC کلیک کنید.
کد زیر اسکریپت ما را فراخوانی می کند تا داده های گزارش تعریف شده در بالا را تولید کند و سپس گزارش ها را به Pub/Sub متصل و ارسال می کند. تنها کاری که باید انجام دهیم این است که یک شی بسازیم PublisherClient، مسیر موضوع را با استفاده از روش مشخص کنید topic_path
و تابع را فراخوانی کنید publish
с topic_path
و داده ها لطفا توجه داشته باشید که ما وارد می کنیم generate_log_line
از فیلمنامه ما stream_logs
، بنابراین مطمئن شوید که این فایل ها در یک پوشه هستند، در غیر این صورت با خطای import مواجه می شوید. سپس می توانیم این را از طریق کنسول گوگل خود با استفاده از:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
همانطور که در شکل زیر نشان داده شده است، به محض اجرای فایل، می توانیم خروجی داده های لاگ به کنسول را مشاهده کنیم. این اسکریپت تا زمانی که ما استفاده نکنیم کار خواهد کرد CTRL + Cبرای تکمیل آن
شکل 4. خروجی publish_logs.py
نوشتن کد خط لوله ما
اکنون که همه چیز را آماده کرده ایم، می توانیم قسمت سرگرم کننده را شروع کنیم - کدگذاری خط لوله خود با استفاده از Beam و Python. برای ایجاد خط لوله Beam، باید یک آبجکت خط لوله (p) ایجاد کنیم. هنگامی که یک آبجکت خط لوله ایجاد کردیم، می توانیم چندین تابع را یکی پس از دیگری با استفاده از عملگر اعمال کنیم pipe (|)
. به طور کلی، گردش کار مانند تصویر زیر است.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
در کد ما دو تابع سفارشی ایجاد خواهیم کرد. تابع regex_clean
، که داده ها را اسکن می کند و ردیف مربوطه را بر اساس لیست PATTERNS با استفاده از تابع بازیابی می کند. re.search
. تابع یک رشته جدا شده با کاما برمی گرداند. اگر متخصص بیان منظم نیستید، توصیه می کنم این را بررسی کنید datetime
داخل یک تابع تا کار کند. من در ابتدای فایل با خطای import مواجه بودم که عجیب بود. سپس این لیست به تابع منتقل می شود WriteToBigQuery، که به سادگی داده های ما را به جدول اضافه می کند. کد Batch DataFlow Job و Streaming DataFlow Job در زیر آورده شده است. تنها تفاوت بین کد دسته ای و استریم این است که به صورت دسته ای ما CSV را از آن می خوانیم src_path
با استفاده از تابع ReadFromText
از پرتو.
Batch DataFlow Job (پردازش دسته ای)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
کار جریان داده جریان (پردازش جریان)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
راه اندازی نوار نقاله
ما می توانیم خط لوله را به روش های مختلف اجرا کنیم. اگر بخواهیم، میتوانیم آن را به صورت محلی از یک ترمینال و در حالی که از راه دور وارد GCP میشویم، اجرا کنیم.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
با این حال، ما قصد داریم آن را با استفاده از DataFlow اجرا کنیم. ما می توانیم این کار را با استفاده از دستور زیر با تنظیم پارامترهای مورد نیاز زیر انجام دهیم.
project
- شناسه پروژه GCP شما.runner
اجرای خط لوله است که برنامه شما را تجزیه و تحلیل می کند و خط لوله شما را می سازد. برای اجرا در فضای ابری باید DataflowRunner را مشخص کنید.staging_location
- مسیر ذخیره سازی ابری Cloud Dataflow برای نمایه سازی بسته های کد مورد نیاز پردازنده هایی که کار را انجام می دهند.temp_location
- مسیر ذخیره سازی ابری Cloud Dataflow برای ذخیره فایل های شغلی موقت ایجاد شده در حین اجرای خط لوله.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
در حالی که این دستور اجرا می شود، می توانیم به تب DataFlow در کنسول گوگل برویم و خط لوله خود را مشاهده کنیم. وقتی روی خط لوله کلیک می کنیم، باید چیزی شبیه به شکل 4 ببینیم. برای اهداف اشکال زدایی، رفتن به Logs و سپس Stackdriver برای مشاهده گزارش های دقیق می تواند بسیار مفید باشد. این به من کمک کرد تا مشکلات مربوط به خط لوله را در تعدادی از موارد حل کنم.
شکل 4: نوار نقاله
به داده های ما در BigQuery دسترسی داشته باشید
بنابراین، ما از قبل باید خط لوله ای داشته باشیم که داده ها در جدول ما جاری است. برای آزمایش این، میتوانیم به BigQuery برویم و به دادهها نگاه کنیم. پس از استفاده از دستور زیر باید چند ردیف اول مجموعه داده را ببینید. اکنون که دادههای ذخیره شده در BigQuery را داریم، میتوانیم تجزیه و تحلیل بیشتری انجام دهیم و همچنین دادهها را با همکاران به اشتراک بگذاریم و شروع به پاسخگویی به سؤالات تجاری کنیم.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
شکل 5: BigQuery
نتیجه
امیدواریم این پست به عنوان یک نمونه مفید برای ایجاد خط لوله داده های جریانی و همچنین یافتن راه هایی برای دسترسی بیشتر به داده ها باشد. ذخیره سازی داده ها در این فرمت مزایای زیادی به ما می دهد. اکنون میتوانیم به سؤالات مهمی مانند چند نفر از محصول ما پاسخ دهیم؟ آیا پایگاه کاربران شما در طول زمان در حال افزایش است؟ افراد با چه جنبه های محصول بیشتر تعامل دارند؟ و آیا خطاهایی وجود دارد که نباید وجود داشته باشد؟ اینها سوالاتی هستند که مورد توجه سازمان خواهند بود. بر اساس بینشهایی که از پاسخ به این سؤالات به دست میآید، میتوانیم محصول را بهبود بخشیم و تعامل کاربران را افزایش دهیم.
Beam واقعا برای این نوع تمرین مفید است و تعدادی موارد استفاده جالب دیگر نیز دارد. به عنوان مثال، ممکن است بخواهید دادههای تیک سهام را در زمان واقعی تجزیه و تحلیل کنید و بر اساس تجزیه و تحلیل معاملات انجام دهید، شاید دادههای حسگر از وسایل نقلیه دارید و میخواهید محاسبات سطح ترافیک را محاسبه کنید. برای مثال، میتوانید یک شرکت بازی باشید که دادههای کاربر را جمعآوری میکند و از آن برای ایجاد داشبورد برای ردیابی معیارهای کلیدی استفاده میکند. بسیار خوب، آقایان، این یک موضوع برای یک پست دیگر است، با تشکر از خواندن، و برای کسانی که می خواهند کد کامل را ببینند، در زیر لینک GitHub من است.
همه چیز هست
منبع: www.habr.com