أهلاً بكم. نشارك ترجمة الجزء الأخير من المقالة ، المُعد خصيصًا لطلاب الدورة
Apache Beam و DataFlow لخطوط الأنابيب في الوقت الفعلي
إعداد جوجل كلاود
ملاحظة: لقد استخدمت Google Cloud Shell لتشغيل خط الأنابيب ونشر بيانات سجل المستخدمين لأنني كنت أواجه مشكلة في تشغيل خط الأنابيب في Python 3. يستخدم Google Cloud Shell Python 2 ، وهو أكثر اتساقًا مع Apache Beam.
للحصول على خط الأنابيب وتشغيله ، نحتاج إلى التعمق قليلاً في الإعدادات. بالنسبة لأولئك الذين لم يستخدموا برنامج "شركاء Google المعتمدون" من قبل ، يلزمك إكمال الخطوات الست التالية في هذا
بعد ذلك ، سنحتاج إلى تحميل البرامج النصية الخاصة بنا على Google Cloud Storage ونسخها إلى Google Cloud Shel. التحميل إلى التخزين السحابي بسيط للغاية (يمكن العثور على الوصف
الرقم 2
الأوامر التي نحتاجها لنسخ الملفات وتثبيت المكتبات المطلوبة مذكورة أدناه.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
إنشاء قاعدة البيانات والجدول لدينا
بمجرد الانتهاء من جميع خطوات الإعداد ، فإن الشيء التالي الذي يتعين علينا القيام به هو إنشاء مجموعة بيانات وجدول في BigQuery. هناك عدة طرق للقيام بذلك ، ولكن أبسطها هو استخدام Google Cloud console عن طريق إنشاء مجموعة بيانات أولاً. يمكنك اتباع الخطوات التالية
الشكل 3. مخطط الجدول
نشر بيانات سجل المستخدم
يعد Pub / Sub مكونًا مهمًا في خط الأنابيب لدينا لأنه يسمح للعديد من التطبيقات المستقلة بالتواصل مع بعضها البعض. على وجه التحديد ، يعمل كوسيط يسمح لنا بإرسال واستقبال الرسائل بين التطبيقات. أول شيء علينا القيام به هو إنشاء موضوع (موضوع). ما عليك سوى الانتقال إلى Pub / Sub في وحدة التحكم والنقر فوق إنشاء موضوع.
يستدعي الكود أدناه البرنامج النصي الخاص بنا لإنشاء بيانات السجل المحددة أعلاه ثم يتصل ويرسل السجلات إلى Pub / Sub. الشيء الوحيد الذي يتعين علينا القيام به هو إنشاء كائن الناشر العميل، حدد مسار الموضوع باستخدام الطريقة topic_path
واستدعاء الوظيفة publish
с topic_path
والبيانات. يرجى ملاحظة أننا نستورد generate_log_line
من نصنا stream_logs
لذا تأكد من وجود هذه الملفات في نفس المجلد وإلا فستتلقى خطأ استيراد. يمكننا بعد ذلك تشغيل هذا من خلال وحدة تحكم google الخاصة بنا باستخدام:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
بمجرد تشغيل الملف ، سنتمكن من مراقبة إخراج بيانات السجل إلى وحدة التحكم ، كما هو موضح في الشكل أدناه. سيعمل هذا البرنامج النصي طالما أننا لا نستخدمه CTRL + Cلإكماله.
الشكل 4. الخلاصة publish_logs.py
كتابة كود خط الأنابيب لدينا
الآن بعد أن تم إعداد كل شيء ، يمكننا الوصول إلى الجزء الممتع - ترميز خط الأنابيب الخاص بنا باستخدام Beam و Python. لإنشاء خط أنابيب شعاع ، نحتاج إلى إنشاء كائن خط أنابيب (ع). بمجرد إنشاء كائن خط أنابيب ، يمكننا تطبيق وظائف متعددة واحدة تلو الأخرى باستخدام عامل التشغيل pipe (|)
. بشكل عام ، يبدو سير العمل مثل الشكل أدناه.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
في الكود الخاص بنا ، سننشئ وظيفتين محددتين من قبل المستخدم. وظيفة regex_clean
، الذي يقوم بمسح البيانات واستخراج سلسلة المطابقة بناءً على قائمة الأنماط باستخدام الوظيفة re.search
. تقوم الدالة بإرجاع سلسلة مفصولة بفواصل. إذا لم تكن خبيرًا في التعبيرات العادية ، فإنني أوصي بمراجعة ذلك. datetime
داخل الوظيفة لجعلها تعمل. لقد تلقيت خطأً عند الاستيراد في بداية الملف ، وهو أمر غريب. ثم يتم تمرير هذه القائمة إلى الوظيفة الكتابة إلى BigQuery، والتي تضيف بياناتنا ببساطة إلى الجدول. يرد أدناه رمز Batch DataFlow Job و Streaming DataFlow Job. الاختلاف الوحيد بين الكود الدفعي والدفق هو أننا نقرأ ملف CSV من الدُفعة src_path
باستخدام الوظيفة ReadFromText
من بيم.
دفعة DataFlow المهمة (معالجة الدُفعات)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
وظيفة تدفق البيانات (معالجة التدفق)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
تشغيل خط الأنابيب
يمكننا أن نبدأ خط الأنابيب بعدة طرق مختلفة. إذا أردنا ذلك ، يمكننا فقط تشغيله محليًا من محطة طرفية عن طريق تسجيل الدخول إلى GCP عن بُعد.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
ومع ذلك ، سنقوم بتشغيله باستخدام DataFlow. يمكننا القيام بذلك باستخدام الأمر أدناه عن طريق تعيين المعلمات المطلوبة التالية.
project
- معرّف مشروع GCP الخاص بك.runner
هو عداء خط أنابيب يقوم بتحليل برنامجك وإنشاء خط الأنابيب الخاص بك. للتشغيل في السحابة ، يجب عليك تحديد DataflowRunner.staging_location
هو المسار إلى التخزين السحابي Cloud Dataflow لفهرسة حزم التعليمات البرمجية المطلوبة من قبل المعالجات التي تؤدي العمل.temp_location
- المسار إلى التخزين السحابي Cloud Dataflow لوضع ملفات الوظائف المؤقتة التي تم إنشاؤها أثناء تشغيل خط الأنابيب.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
أثناء تشغيل هذا الأمر ، يمكننا الانتقال إلى علامة التبويب DataFlow في وحدة تحكم google وعرض خط الأنابيب الخاص بنا. من خلال النقر على خط الأنابيب ، يجب أن نرى شيئًا مشابهًا للشكل 4. لأغراض تصحيح الأخطاء ، قد يكون من المفيد جدًا الانتقال إلى السجلات ثم إلى Stackdriver لمشاهدة السجلات التفصيلية. لقد ساعدني هذا في حل مشاكل خط الأنابيب في عدد من الحالات.
الشكل 4: خط أنابيب الشعاع
الوصول إلى بياناتنا في BigQuery
لذلك ، يجب أن يكون لدينا بالفعل خط أنابيب يعمل بالبيانات الواردة إلى طاولتنا. لاختبار ذلك ، يمكننا الانتقال إلى BigQuery وعرض البيانات. بعد استخدام الأمر أدناه ، سترى الأسطر القليلة الأولى من مجموعة البيانات. الآن وبعد أن أصبح لدينا البيانات مخزنة في BigQuery ، يمكننا إجراء مزيد من التحليل بالإضافة إلى مشاركة البيانات مع الزملاء والبدء في الإجابة على أسئلة العمل.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
الشكل 5: BigQuery
اختتام
نأمل أن يكون هذا المنشور بمثابة مثال مفيد لبناء خط تدفق البيانات ، وكذلك إيجاد طرق لجعل البيانات أكثر سهولة. يمنحنا تخزين البيانات بهذا التنسيق العديد من المزايا. يمكننا الآن البدء في الإجابة على أسئلة مهمة مثل عدد الأشخاص الذين يستخدمون منتجنا؟ هل تنمو قاعدة المستخدمين بمرور الوقت؟ ما هي جوانب المنتج التي يتفاعل معها الأشخاص أكثر من غيرها؟ وهل هناك أخطاء لا ينبغي أن تكون؟ هذه هي الأسئلة التي ستهم المنظمة. استنادًا إلى الرؤى المستمدة من الإجابات على هذه الأسئلة ، سنتمكن من تحسين المنتج وزيادة تفاعل المستخدم.
يعتبر Beam مفيدًا حقًا لهذا النوع من التمارين ولديه أيضًا عدد من حالات الاستخدام الأخرى المثيرة للاهتمام. على سبيل المثال ، يمكنك تحليل بيانات مؤشر الأسهم في الوقت الفعلي وإجراء الصفقات بناءً على التحليل ، ربما لديك بيانات مستشعر قادمة من المركبات وتريد حساب حساب مستوى حركة المرور. يمكنك أيضًا ، على سبيل المثال ، أن تكون شركة ألعاب تجمع بيانات المستخدم وتستخدمها لإنشاء لوحات معلومات لتتبع المقاييس الرئيسية. حسنًا ، أيها السادة ، هذا موضوع لمنشور آخر ، شكرًا على القراءة ، وبالنسبة لأولئك الذين يريدون رؤية الكود الكامل ، يوجد أدناه رابط إلى GitHub الخاص بي.
هذا كل شئ
المصدر: www.habr.com