ہم ایک اسٹریم ڈیٹا پروسیسنگ پائپ لائن بناتے ہیں۔ حصہ 2

سب کو سلام. ہم مضمون کے آخری حصے کا ترجمہ شیئر کر رہے ہیں، جو خاص طور پر کورس کے طلباء کے لیے تیار کیا گیا ہے۔ ڈیٹا انجینئر. آپ پہلا حصہ پڑھ سکتے ہیں۔ یہاں.

ریئل ٹائم پائپ لائنز کے لیے اپاچی بیم اور ڈیٹا فلو

ہم ایک اسٹریم ڈیٹا پروسیسنگ پائپ لائن بناتے ہیں۔ حصہ 2

گوگل کلاؤڈ ترتیب دیا جا رہا ہے۔

نوٹ: میں نے پائپ لائن چلانے اور حسب ضرورت لاگ ڈیٹا شائع کرنے کے لیے گوگل کلاؤڈ شیل کا استعمال کیا کیونکہ مجھے Python 3 میں پائپ لائن چلانے میں دشواری ہو رہی تھی۔ Google Cloud Shell Python 2 کا استعمال کرتا ہے، جو Apache Beam کے ساتھ زیادہ مطابقت رکھتا ہے۔

پائپ لائن شروع کرنے کے لئے، ہمیں ترتیبات میں تھوڑا سا کھودنے کی ضرورت ہے۔ آپ میں سے ان لوگوں کے لیے جنہوں نے پہلے GCP استعمال نہیں کیا ہے، آپ کو اس میں بیان کردہ مندرجہ ذیل 6 مراحل پر عمل کرنے کی ضرورت ہوگی۔ صفحہ.

اس کے بعد، ہمیں اپنے اسکرپٹس کو گوگل کلاؤڈ اسٹوریج پر اپ لوڈ کرنے اور انہیں اپنے گوگل کلاؤڈ شیل میں کاپی کرنے کی ضرورت ہوگی۔ کلاؤڈ اسٹوریج پر اپ لوڈ کرنا بہت معمولی بات ہے (تفصیل مل سکتی ہے۔ یہاں)۔ اپنی فائلوں کو کاپی کرنے کے لیے، ہم نیچے تصویر 2 میں بائیں جانب پہلے آئیکن پر کلک کرکے ٹول بار سے گوگل کلاؤڈ شیل کھول سکتے ہیں۔

ہم ایک اسٹریم ڈیٹا پروسیسنگ پائپ لائن بناتے ہیں۔ حصہ 2
ساخت، پیکر 2

فائلوں کو کاپی کرنے اور مطلوبہ لائبریریوں کو انسٹال کرنے کے لیے ہمیں جن کمانڈز کی ضرورت ہے وہ ذیل میں درج ہیں۔

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

ہمارا ڈیٹا بیس اور ٹیبل بنانا

ایک بار جب ہم سیٹ اپ سے متعلق تمام مراحل مکمل کر لیتے ہیں، تو اگلا کام جو ہمیں کرنا ہے وہ ہے BigQuery میں ڈیٹا سیٹ اور ٹیبل بنانا۔ ایسا کرنے کے کئی طریقے ہیں، لیکن سب سے آسان یہ ہے کہ پہلے ڈیٹا سیٹ بنا کر گوگل کلاؤڈ کنسول استعمال کریں۔ آپ ذیل کے مراحل پر عمل کر سکتے ہیں۔ لنکایک سکیما کے ساتھ ایک میز بنانے کے لئے. ہماری میز پڑے گی۔ 7 کالم، ہر صارف لاگ کے اجزاء کے مطابق۔ سہولت کے لیے، ہم تمام کالموں کو سٹرنگز کے طور پر متعین کریں گے، سوائے ٹائم لوکل متغیر کے، اور ان کا نام ان متغیرات کے مطابق رکھیں گے جو ہم نے پہلے تیار کیے تھے۔ ہمارے ٹیبل کی ترتیب تصویر 3 کی طرح نظر آنی چاہئے۔

ہم ایک اسٹریم ڈیٹا پروسیسنگ پائپ لائن بناتے ہیں۔ حصہ 2
شکل 3. ٹیبل لے آؤٹ

صارف لاگ ڈیٹا شائع کرنا

Pub/Sub ہماری پائپ لائن کا ایک اہم جزو ہے کیونکہ یہ متعدد آزاد ایپلی کیشنز کو ایک دوسرے کے ساتھ بات چیت کرنے کی اجازت دیتا ہے۔ خاص طور پر، یہ ایک بیچوان کے طور پر کام کرتا ہے جو ہمیں ایپلیکیشنز کے درمیان پیغامات بھیجنے اور وصول کرنے کی اجازت دیتا ہے۔ سب سے پہلے ہمیں ایک موضوع بنانے کی ضرورت ہے۔ کنسول میں بس Pub/Sub پر جائیں اور CREATE TOPIC پر کلک کریں۔

نیچے دیا گیا کوڈ اوپر بیان کردہ لاگ ڈیٹا تیار کرنے کے لیے ہماری اسکرپٹ کو کال کرتا ہے اور پھر لاگز کو Pub/Sub کو جوڑ کر بھیجتا ہے۔ ہمیں صرف ایک چیز بنانے کی ضرورت ہے۔ پبلشر کلائنٹ، طریقہ کا استعمال کرتے ہوئے موضوع کے راستے کی وضاحت کریں۔ topic_path اور فنکشن کو کال کریں۔ publish с topic_path اور ڈیٹا. براہ کرم نوٹ کریں کہ ہم درآمد کرتے ہیں۔ generate_log_line ہمارے اسکرپٹ سے stream_logs، لہذا یقینی بنائیں کہ یہ فائلیں ایک ہی فولڈر میں ہیں، بصورت دیگر آپ کو درآمد کی خرابی ملے گی۔ اس کے بعد ہم اسے اپنے گوگل کنسول کے ذریعے چلا سکتے ہیں:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

جیسے ہی فائل چلتی ہے، ہم کنسول میں لاگ ڈیٹا کی آؤٹ پٹ دیکھ سکیں گے، جیسا کہ نیچے دی گئی تصویر میں دکھایا گیا ہے۔ یہ اسکرپٹ تب تک کام کرے گا جب تک ہم استعمال نہیں کرتے CTRL + Cاسے مکمل کرنے کے لیے.

ہم ایک اسٹریم ڈیٹا پروسیسنگ پائپ لائن بناتے ہیں۔ حصہ 2
شکل 4. آؤٹ پٹ publish_logs.py

ہمارا پائپ لائن کوڈ لکھنا

اب جب کہ ہمارے پاس سب کچھ تیار ہے، ہم تفریحی حصہ شروع کر سکتے ہیں - بیم اور ازگر کا استعمال کرتے ہوئے اپنی پائپ لائن کو کوڈنگ کرنا۔ بیم پائپ لائن بنانے کے لیے، ہمیں پائپ لائن آبجیکٹ (p) بنانے کی ضرورت ہے۔ ایک بار جب ہم پائپ لائن آبجیکٹ بنا لیتے ہیں، تو ہم آپریٹر کا استعمال کرتے ہوئے ایک کے بعد ایک متعدد فنکشن لگا سکتے ہیں۔ pipe (|). عام طور پر، ورک فلو نیچے کی تصویر کی طرح لگتا ہے۔

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

ہمارے کوڈ میں، ہم دو کسٹم فنکشنز بنائیں گے۔ فنکشن regex_clean، جو ڈیٹا کو اسکین کرتا ہے اور فنکشن کا استعمال کرتے ہوئے PATTERNS فہرست کی بنیاد پر متعلقہ قطار کو بازیافت کرتا ہے۔ re.search. فنکشن کوما سے الگ کردہ سٹرنگ لوٹاتا ہے۔ اگر آپ ریگولر ایکسپریشن کے ماہر نہیں ہیں تو میں اسے چیک کرنے کی تجویز کرتا ہوں۔ سبق اور کوڈ چیک کرنے کے لیے نوٹ پیڈ میں مشق کریں۔ اس کے بعد ہم ایک کسٹم ParDo فنکشن کی وضاحت کرتے ہیں جسے کہتے ہیں۔ سپلٹجو کہ متوازی پروسیسنگ کے لیے بیم ٹرانسفارم کی تبدیلی ہے۔ Python میں، یہ ایک خاص طریقے سے کیا جاتا ہے - ہمیں ایک ایسی کلاس بنانا چاہیے جو DoFn بیم کلاس سے وراثت میں ملتی ہو۔ سپلٹ فنکشن پچھلے فنکشن سے پارس شدہ قطار لیتا ہے اور ہمارے BigQuery ٹیبل میں کالم کے ناموں سے متعلق کلیدوں کے ساتھ لغات کی فہرست لوٹاتا ہے۔ اس فنکشن کے بارے میں نوٹ کرنے کے لئے کچھ ہے: مجھے درآمد کرنا پڑا datetime اسے کام کرنے کے لیے فنکشن کے اندر۔ مجھے فائل کے آغاز میں ایک درآمدی غلطی ہو رہی تھی، جو عجیب تھی۔ اس کے بعد یہ فہرست فنکشن کو بھیج دی جاتی ہے۔ ToBigQuery لکھیں۔، جو صرف ہمارے ڈیٹا کو ٹیبل میں شامل کرتا ہے۔ بیچ ڈیٹا فلو جاب اور سٹریمنگ ڈیٹا فلو جاب کا کوڈ ذیل میں دیا گیا ہے۔ بیچ اور اسٹریمنگ کوڈ کے درمیان فرق صرف یہ ہے کہ بیچ میں ہم CSV سے پڑھتے ہیں۔ src_pathفنکشن کا استعمال کرتے ہوئے ReadFromText بیم سے

بیچ ڈیٹا فلو جاب (بیچ پروسیسنگ)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

سٹریمنگ ڈیٹا فلو جاب (سٹریم پروسیسنگ)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

کنویئر شروع کرنا

ہم پائپ لائن کو کئی مختلف طریقوں سے چلا سکتے ہیں۔ اگر ہم چاہیں تو، ہم اسے مقامی طور پر ٹرمینل سے چلا سکتے ہیں جب کہ GCP میں دور سے لاگ ان ہوں۔

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

تاہم، ہم اسے ڈیٹا فلو کا استعمال کرتے ہوئے چلانے جا رہے ہیں۔ ہم درج ذیل مطلوبہ پیرامیٹرز کو ترتیب دے کر نیچے دی گئی کمانڈ کا استعمال کر کے یہ کر سکتے ہیں۔

  • project - آپ کے GCP پروجیکٹ کی ID۔
  • runner ایک پائپ لائن رنر ہے جو آپ کے پروگرام کا تجزیہ کرے گا اور آپ کی پائپ لائن تعمیر کرے گا۔ کلاؤڈ میں چلانے کے لیے، آپ کو ڈیٹا فلو رنر کی وضاحت کرنی ہوگی۔
  • staging_location - کلاؤڈ ڈیٹا فلو کلاؤڈ اسٹوریج کا راستہ انڈیکسنگ کوڈ پیکجوں کے لیے جو کام کرنے والے پروسیسرز کو درکار ہے۔
  • temp_location - پائپ لائن کے چلنے کے دوران تخلیق کردہ عارضی ملازمت کی فائلوں کو ذخیرہ کرنے کے لیے کلاؤڈ ڈیٹا فلو کلاؤڈ اسٹوریج کا راستہ۔
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

جب یہ کمانڈ چل رہی ہو، ہم گوگل کنسول میں ڈیٹا فلو ٹیب پر جا کر اپنی پائپ لائن دیکھ سکتے ہیں۔ جب ہم پائپ لائن پر کلک کرتے ہیں، تو ہمیں شکل 4 سے ملتا جلتا کچھ نظر آنا چاہیے۔ ڈیبگنگ کے مقاصد کے لیے، تفصیلی لاگز دیکھنے کے لیے Logs اور پھر Stackdriver پر جانا بہت مددگار ثابت ہو سکتا ہے۔ اس سے مجھے کئی معاملات میں پائپ لائن کے مسائل حل کرنے میں مدد ملی ہے۔

ہم ایک اسٹریم ڈیٹا پروسیسنگ پائپ لائن بناتے ہیں۔ حصہ 2
شکل 4: بیم کنویئر

BigQuery میں ہمارے ڈیٹا تک رسائی حاصل کریں۔

لہذا، ہمارے پاس پہلے سے ہی ایک پائپ لائن چلنی چاہئے جس میں ڈیٹا ہمارے ٹیبل میں بہہ رہا ہو۔ اس کی جانچ کرنے کے لیے، ہم BigQuery پر جا کر ڈیٹا کو دیکھ سکتے ہیں۔ نیچے دی گئی کمانڈ کو استعمال کرنے کے بعد آپ کو ڈیٹاسیٹ کی پہلی چند قطاریں نظر آنی چاہئیں۔ اب جبکہ ہمارے پاس BigQuery میں ڈیٹا محفوظ ہے، ہم مزید تجزیہ کرنے کے ساتھ ساتھ ساتھیوں کے ساتھ ڈیٹا کا اشتراک کر سکتے ہیں اور کاروباری سوالات کے جوابات دینا شروع کر سکتے ہیں۔

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

ہم ایک اسٹریم ڈیٹا پروسیسنگ پائپ لائن بناتے ہیں۔ حصہ 2
شکل 5: BigQuery

حاصل يہ ہوا

ہم امید کرتے ہیں کہ یہ پوسٹ اسٹریمنگ ڈیٹا پائپ لائن بنانے کے ساتھ ساتھ ڈیٹا کو مزید قابل رسائی بنانے کے طریقے تلاش کرنے کی ایک مفید مثال کے طور پر کام کرے گی۔ اس فارمیٹ میں ڈیٹا کو ذخیرہ کرنے سے ہمیں بہت سے فوائد حاصل ہوتے ہیں۔ اب ہم اہم سوالات کے جوابات دینا شروع کر سکتے ہیں جیسے کہ کتنے لوگ ہماری پروڈکٹ استعمال کرتے ہیں؟ کیا آپ کا صارف کی بنیاد وقت کے ساتھ بڑھ رہی ہے؟ مصنوعات کے کن پہلوؤں کے ساتھ لوگ سب سے زیادہ تعامل کرتے ہیں؟ اور کیا ایسی غلطیاں ہیں جہاں نہیں ہونی چاہئیں؟ یہ وہ سوالات ہیں جو ادارے کے لیے دلچسپی کا باعث ہوں گے۔ ان سوالات کے جوابات سے ابھرنے والی بصیرت کی بنیاد پر، ہم پروڈکٹ کو بہتر بنا سکتے ہیں اور صارف کی مصروفیت کو بڑھا سکتے ہیں۔

بیم اس قسم کی ورزش کے لیے واقعی مفید ہے اور اس کے استعمال کے کئی دوسرے دلچسپ کیسز بھی ہیں۔ مثال کے طور پر، آپ ریئل ٹائم میں اسٹاک ٹک ڈیٹا کا تجزیہ کرنا چاہتے ہیں اور تجزیے کی بنیاد پر تجارت کرنا چاہتے ہیں، شاید آپ کے پاس گاڑیوں سے آنے والا سینسر ڈیٹا ہے اور آپ ٹریفک کی سطح کا حساب لگانا چاہتے ہیں۔ مثال کے طور پر، آپ ایک گیمنگ کمپنی بھی بن سکتے ہیں جو صارف کا ڈیٹا اکٹھا کرتی ہے اور اسے کلیدی میٹرکس کو ٹریک کرنے کے لیے ڈیش بورڈ بنانے کے لیے استعمال کرتی ہے۔ ٹھیک ہے حضرات، یہ ایک اور پوسٹ کے لیے ایک موضوع ہے، پڑھنے کا شکریہ، اور جو مکمل کوڈ دیکھنا چاہتے ہیں، ان کے لیے نیچے میرے GitHub کا لنک ہے۔

https://github.com/DFoly/User_log_pipeline

بس۔ پہلا حصہ پڑھیں.

ماخذ: www.habr.com

نیا تبصرہ شامل کریں