نقوم بإنشاء خط أنابيب لتدفق معالجة البيانات. الجزء 2

أهلاً بكم. نشارك ترجمة الجزء الأخير من المقالة ، المُعد خصيصًا لطلاب الدورة مهندس بيانات. يمكن العثور على الجزء الأول هنا.

Apache Beam و DataFlow لخطوط الأنابيب في الوقت الفعلي

نقوم بإنشاء خط أنابيب لتدفق معالجة البيانات. الجزء 2

إعداد جوجل كلاود

ملاحظة: لقد استخدمت Google Cloud Shell لتشغيل خط الأنابيب ونشر بيانات سجل المستخدمين لأنني كنت أواجه مشكلة في تشغيل خط الأنابيب في Python 3. يستخدم Google Cloud Shell Python 2 ، وهو أكثر اتساقًا مع Apache Beam.

للحصول على خط الأنابيب وتشغيله ، نحتاج إلى التعمق قليلاً في الإعدادات. بالنسبة لأولئك الذين لم يستخدموا برنامج "شركاء Google المعتمدون" من قبل ، يلزمك إكمال الخطوات الست التالية في هذا صفحة.

بعد ذلك ، سنحتاج إلى تحميل البرامج النصية الخاصة بنا على Google Cloud Storage ونسخها إلى Google Cloud Shel. التحميل إلى التخزين السحابي بسيط للغاية (يمكن العثور على الوصف هنا). لنسخ ملفاتنا ، يمكننا فتح Google Cloud Shel من شريط الأدوات بالنقر فوق الرمز الأول على اليسار في الشكل 2 أدناه.

نقوم بإنشاء خط أنابيب لتدفق معالجة البيانات. الجزء 2
الرقم 2

الأوامر التي نحتاجها لنسخ الملفات وتثبيت المكتبات المطلوبة مذكورة أدناه.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

إنشاء قاعدة البيانات والجدول لدينا

بمجرد الانتهاء من جميع خطوات الإعداد ، فإن الشيء التالي الذي يتعين علينا القيام به هو إنشاء مجموعة بيانات وجدول في BigQuery. هناك عدة طرق للقيام بذلك ، ولكن أبسطها هو استخدام Google Cloud console عن طريق إنشاء مجموعة بيانات أولاً. يمكنك اتباع الخطوات التالية صلةلإنشاء جدول بالمخطط. سيكون لدينا طاولتنا 7 أعمدة، المقابلة لمكونات كل سجل مستخدم. للراحة ، سنقوم بتعريف جميع الأعمدة كسلاسل (من نوع سلسلة) ، باستثناء المتغير المحلي timelocal ، ونقوم بتسميتها وفقًا للمتغيرات التي أنشأناها سابقًا. يجب أن يبدو تخطيط طاولتنا مثل الشكل 3.

نقوم بإنشاء خط أنابيب لتدفق معالجة البيانات. الجزء 2
الشكل 3. مخطط الجدول

نشر بيانات سجل المستخدم

يعد Pub / Sub مكونًا مهمًا في خط الأنابيب لدينا لأنه يسمح للعديد من التطبيقات المستقلة بالتواصل مع بعضها البعض. على وجه التحديد ، يعمل كوسيط يسمح لنا بإرسال واستقبال الرسائل بين التطبيقات. أول شيء علينا القيام به هو إنشاء موضوع (موضوع). ما عليك سوى الانتقال إلى Pub / Sub في وحدة التحكم والنقر فوق إنشاء موضوع.

يستدعي الكود أدناه البرنامج النصي الخاص بنا لإنشاء بيانات السجل المحددة أعلاه ثم يتصل ويرسل السجلات إلى Pub / Sub. الشيء الوحيد الذي يتعين علينا القيام به هو إنشاء كائن الناشر العميل، حدد مسار الموضوع باستخدام الطريقة topic_path واستدعاء الوظيفة publish с topic_path والبيانات. يرجى ملاحظة أننا نستورد generate_log_line من نصنا stream_logsلذا تأكد من وجود هذه الملفات في نفس المجلد وإلا فستتلقى خطأ استيراد. يمكننا بعد ذلك تشغيل هذا من خلال وحدة تحكم google الخاصة بنا باستخدام:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

بمجرد تشغيل الملف ، سنتمكن من مراقبة إخراج بيانات السجل إلى وحدة التحكم ، كما هو موضح في الشكل أدناه. سيعمل هذا البرنامج النصي طالما أننا لا نستخدمه CTRL + Cلإكماله.

نقوم بإنشاء خط أنابيب لتدفق معالجة البيانات. الجزء 2
الشكل 4. الخلاصة publish_logs.py

كتابة كود خط الأنابيب لدينا

الآن بعد أن تم إعداد كل شيء ، يمكننا الوصول إلى الجزء الممتع - ترميز خط الأنابيب الخاص بنا باستخدام Beam و Python. لإنشاء خط أنابيب شعاع ، نحتاج إلى إنشاء كائن خط أنابيب (ع). بمجرد إنشاء كائن خط أنابيب ، يمكننا تطبيق وظائف متعددة واحدة تلو الأخرى باستخدام عامل التشغيل pipe (|). بشكل عام ، يبدو سير العمل مثل الشكل أدناه.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

في الكود الخاص بنا ، سننشئ وظيفتين محددتين من قبل المستخدم. وظيفة regex_clean، الذي يقوم بمسح البيانات واستخراج سلسلة المطابقة بناءً على قائمة الأنماط باستخدام الوظيفة re.search. تقوم الدالة بإرجاع سلسلة مفصولة بفواصل. إذا لم تكن خبيرًا في التعبيرات العادية ، فإنني أوصي بمراجعة ذلك. درس تعليمي وممارسة في المفكرة لاختبار الكود. ثم نحدد وظيفة ParDo مخصصة تسمى الانقسام، وهو أحد أشكال تحويل الشعاع للمعالجة المتوازية. في Python ، يتم ذلك بطريقة خاصة - يجب علينا إنشاء فئة ترث من فئة DoFn Beam. تأخذ وظيفة Split السلسلة التي تم تحليلها من الوظيفة السابقة وتعيد قائمة بالقواميس مع مفاتيح تتوافق مع أسماء الأعمدة في جدول BigQuery. هناك شيء يجب ملاحظته حول هذه الوظيفة: اضطررت إلى الاستيراد datetime داخل الوظيفة لجعلها تعمل. لقد تلقيت خطأً عند الاستيراد في بداية الملف ، وهو أمر غريب. ثم يتم تمرير هذه القائمة إلى الوظيفة الكتابة إلى BigQuery، والتي تضيف بياناتنا ببساطة إلى الجدول. يرد أدناه رمز Batch DataFlow Job و Streaming DataFlow Job. الاختلاف الوحيد بين الكود الدفعي والدفق هو أننا نقرأ ملف CSV من الدُفعة src_pathباستخدام الوظيفة ReadFromText من بيم.

دفعة DataFlow المهمة (معالجة الدُفعات)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

وظيفة تدفق البيانات (معالجة التدفق)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

تشغيل خط الأنابيب

يمكننا أن نبدأ خط الأنابيب بعدة طرق مختلفة. إذا أردنا ذلك ، يمكننا فقط تشغيله محليًا من محطة طرفية عن طريق تسجيل الدخول إلى GCP عن بُعد.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

ومع ذلك ، سنقوم بتشغيله باستخدام DataFlow. يمكننا القيام بذلك باستخدام الأمر أدناه عن طريق تعيين المعلمات المطلوبة التالية.

  • project - معرّف مشروع GCP الخاص بك.
  • runner هو عداء خط أنابيب يقوم بتحليل برنامجك وإنشاء خط الأنابيب الخاص بك. للتشغيل في السحابة ، يجب عليك تحديد DataflowRunner.
  • staging_location هو المسار إلى التخزين السحابي Cloud Dataflow لفهرسة حزم التعليمات البرمجية المطلوبة من قبل المعالجات التي تؤدي العمل.
  • temp_location - المسار إلى التخزين السحابي Cloud Dataflow لوضع ملفات الوظائف المؤقتة التي تم إنشاؤها أثناء تشغيل خط الأنابيب.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

أثناء تشغيل هذا الأمر ، يمكننا الانتقال إلى علامة التبويب DataFlow في وحدة تحكم google وعرض خط الأنابيب الخاص بنا. من خلال النقر على خط الأنابيب ، يجب أن نرى شيئًا مشابهًا للشكل 4. لأغراض تصحيح الأخطاء ، قد يكون من المفيد جدًا الانتقال إلى السجلات ثم إلى Stackdriver لمشاهدة السجلات التفصيلية. لقد ساعدني هذا في حل مشاكل خط الأنابيب في عدد من الحالات.

نقوم بإنشاء خط أنابيب لتدفق معالجة البيانات. الجزء 2
الشكل 4: خط أنابيب الشعاع

الوصول إلى بياناتنا في BigQuery

لذلك ، يجب أن يكون لدينا بالفعل خط أنابيب يعمل بالبيانات الواردة إلى طاولتنا. لاختبار ذلك ، يمكننا الانتقال إلى BigQuery وعرض البيانات. بعد استخدام الأمر أدناه ، سترى الأسطر القليلة الأولى من مجموعة البيانات. الآن وبعد أن أصبح لدينا البيانات مخزنة في BigQuery ، يمكننا إجراء مزيد من التحليل بالإضافة إلى مشاركة البيانات مع الزملاء والبدء في الإجابة على أسئلة العمل.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

نقوم بإنشاء خط أنابيب لتدفق معالجة البيانات. الجزء 2
الشكل 5: BigQuery

اختتام

نأمل أن يكون هذا المنشور بمثابة مثال مفيد لبناء خط تدفق البيانات ، وكذلك إيجاد طرق لجعل البيانات أكثر سهولة. يمنحنا تخزين البيانات بهذا التنسيق العديد من المزايا. يمكننا الآن البدء في الإجابة على أسئلة مهمة مثل عدد الأشخاص الذين يستخدمون منتجنا؟ هل تنمو قاعدة المستخدمين بمرور الوقت؟ ما هي جوانب المنتج التي يتفاعل معها الأشخاص أكثر من غيرها؟ وهل هناك أخطاء لا ينبغي أن تكون؟ هذه هي الأسئلة التي ستهم المنظمة. استنادًا إلى الرؤى المستمدة من الإجابات على هذه الأسئلة ، سنتمكن من تحسين المنتج وزيادة تفاعل المستخدم.

يعتبر Beam مفيدًا حقًا لهذا النوع من التمارين ولديه أيضًا عدد من حالات الاستخدام الأخرى المثيرة للاهتمام. على سبيل المثال ، يمكنك تحليل بيانات مؤشر الأسهم في الوقت الفعلي وإجراء الصفقات بناءً على التحليل ، ربما لديك بيانات مستشعر قادمة من المركبات وتريد حساب حساب مستوى حركة المرور. يمكنك أيضًا ، على سبيل المثال ، أن تكون شركة ألعاب تجمع بيانات المستخدم وتستخدمها لإنشاء لوحات معلومات لتتبع المقاييس الرئيسية. حسنًا ، أيها السادة ، هذا موضوع لمنشور آخر ، شكرًا على القراءة ، وبالنسبة لأولئك الذين يريدون رؤية الكود الكامل ، يوجد أدناه رابط إلى GitHub الخاص بي.

https://github.com/DFoly/User_log_pipeline

هذا كل شئ اقرأ الجزء الأول.

المصدر: www.habr.com

إضافة تعليق