سب کو سلام. ہم مضمون کے آخری حصے کا ترجمہ شیئر کر رہے ہیں، جو خاص طور پر کورس کے طلباء کے لیے تیار کیا گیا ہے۔
ریئل ٹائم پائپ لائنز کے لیے اپاچی بیم اور ڈیٹا فلو
گوگل کلاؤڈ ترتیب دیا جا رہا ہے۔
نوٹ: میں نے پائپ لائن چلانے اور حسب ضرورت لاگ ڈیٹا شائع کرنے کے لیے گوگل کلاؤڈ شیل کا استعمال کیا کیونکہ مجھے Python 3 میں پائپ لائن چلانے میں دشواری ہو رہی تھی۔ Google Cloud Shell Python 2 کا استعمال کرتا ہے، جو Apache Beam کے ساتھ زیادہ مطابقت رکھتا ہے۔
پائپ لائن شروع کرنے کے لئے، ہمیں ترتیبات میں تھوڑا سا کھودنے کی ضرورت ہے۔ آپ میں سے ان لوگوں کے لیے جنہوں نے پہلے GCP استعمال نہیں کیا ہے، آپ کو اس میں بیان کردہ مندرجہ ذیل 6 مراحل پر عمل کرنے کی ضرورت ہوگی۔
اس کے بعد، ہمیں اپنے اسکرپٹس کو گوگل کلاؤڈ اسٹوریج پر اپ لوڈ کرنے اور انہیں اپنے گوگل کلاؤڈ شیل میں کاپی کرنے کی ضرورت ہوگی۔ کلاؤڈ اسٹوریج پر اپ لوڈ کرنا بہت معمولی بات ہے (تفصیل مل سکتی ہے۔
ساخت، پیکر 2
فائلوں کو کاپی کرنے اور مطلوبہ لائبریریوں کو انسٹال کرنے کے لیے ہمیں جن کمانڈز کی ضرورت ہے وہ ذیل میں درج ہیں۔
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
ہمارا ڈیٹا بیس اور ٹیبل بنانا
ایک بار جب ہم سیٹ اپ سے متعلق تمام مراحل مکمل کر لیتے ہیں، تو اگلا کام جو ہمیں کرنا ہے وہ ہے BigQuery میں ڈیٹا سیٹ اور ٹیبل بنانا۔ ایسا کرنے کے کئی طریقے ہیں، لیکن سب سے آسان یہ ہے کہ پہلے ڈیٹا سیٹ بنا کر گوگل کلاؤڈ کنسول استعمال کریں۔ آپ ذیل کے مراحل پر عمل کر سکتے ہیں۔
شکل 3. ٹیبل لے آؤٹ
صارف لاگ ڈیٹا شائع کرنا
Pub/Sub ہماری پائپ لائن کا ایک اہم جزو ہے کیونکہ یہ متعدد آزاد ایپلی کیشنز کو ایک دوسرے کے ساتھ بات چیت کرنے کی اجازت دیتا ہے۔ خاص طور پر، یہ ایک بیچوان کے طور پر کام کرتا ہے جو ہمیں ایپلیکیشنز کے درمیان پیغامات بھیجنے اور وصول کرنے کی اجازت دیتا ہے۔ سب سے پہلے ہمیں ایک موضوع بنانے کی ضرورت ہے۔ کنسول میں بس Pub/Sub پر جائیں اور CREATE TOPIC پر کلک کریں۔
نیچے دیا گیا کوڈ اوپر بیان کردہ لاگ ڈیٹا تیار کرنے کے لیے ہماری اسکرپٹ کو کال کرتا ہے اور پھر لاگز کو Pub/Sub کو جوڑ کر بھیجتا ہے۔ ہمیں صرف ایک چیز بنانے کی ضرورت ہے۔ پبلشر کلائنٹ، طریقہ کا استعمال کرتے ہوئے موضوع کے راستے کی وضاحت کریں۔ topic_path
اور فنکشن کو کال کریں۔ publish
с topic_path
اور ڈیٹا. براہ کرم نوٹ کریں کہ ہم درآمد کرتے ہیں۔ generate_log_line
ہمارے اسکرپٹ سے stream_logs
، لہذا یقینی بنائیں کہ یہ فائلیں ایک ہی فولڈر میں ہیں، بصورت دیگر آپ کو درآمد کی خرابی ملے گی۔ اس کے بعد ہم اسے اپنے گوگل کنسول کے ذریعے چلا سکتے ہیں:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
جیسے ہی فائل چلتی ہے، ہم کنسول میں لاگ ڈیٹا کی آؤٹ پٹ دیکھ سکیں گے، جیسا کہ نیچے دی گئی تصویر میں دکھایا گیا ہے۔ یہ اسکرپٹ تب تک کام کرے گا جب تک ہم استعمال نہیں کرتے CTRL + Cاسے مکمل کرنے کے لیے.
شکل 4. آؤٹ پٹ publish_logs.py
ہمارا پائپ لائن کوڈ لکھنا
اب جب کہ ہمارے پاس سب کچھ تیار ہے، ہم تفریحی حصہ شروع کر سکتے ہیں - بیم اور ازگر کا استعمال کرتے ہوئے اپنی پائپ لائن کو کوڈنگ کرنا۔ بیم پائپ لائن بنانے کے لیے، ہمیں پائپ لائن آبجیکٹ (p) بنانے کی ضرورت ہے۔ ایک بار جب ہم پائپ لائن آبجیکٹ بنا لیتے ہیں، تو ہم آپریٹر کا استعمال کرتے ہوئے ایک کے بعد ایک متعدد فنکشن لگا سکتے ہیں۔ pipe (|)
. عام طور پر، ورک فلو نیچے کی تصویر کی طرح لگتا ہے۔
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
ہمارے کوڈ میں، ہم دو کسٹم فنکشنز بنائیں گے۔ فنکشن regex_clean
، جو ڈیٹا کو اسکین کرتا ہے اور فنکشن کا استعمال کرتے ہوئے PATTERNS فہرست کی بنیاد پر متعلقہ قطار کو بازیافت کرتا ہے۔ re.search
. فنکشن کوما سے الگ کردہ سٹرنگ لوٹاتا ہے۔ اگر آپ ریگولر ایکسپریشن کے ماہر نہیں ہیں تو میں اسے چیک کرنے کی تجویز کرتا ہوں۔ datetime
اسے کام کرنے کے لیے فنکشن کے اندر۔ مجھے فائل کے آغاز میں ایک درآمدی غلطی ہو رہی تھی، جو عجیب تھی۔ اس کے بعد یہ فہرست فنکشن کو بھیج دی جاتی ہے۔ ToBigQuery لکھیں۔، جو صرف ہمارے ڈیٹا کو ٹیبل میں شامل کرتا ہے۔ بیچ ڈیٹا فلو جاب اور سٹریمنگ ڈیٹا فلو جاب کا کوڈ ذیل میں دیا گیا ہے۔ بیچ اور اسٹریمنگ کوڈ کے درمیان فرق صرف یہ ہے کہ بیچ میں ہم CSV سے پڑھتے ہیں۔ src_path
فنکشن کا استعمال کرتے ہوئے ReadFromText
بیم سے
بیچ ڈیٹا فلو جاب (بیچ پروسیسنگ)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
سٹریمنگ ڈیٹا فلو جاب (سٹریم پروسیسنگ)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
کنویئر شروع کرنا
ہم پائپ لائن کو کئی مختلف طریقوں سے چلا سکتے ہیں۔ اگر ہم چاہیں تو، ہم اسے مقامی طور پر ٹرمینل سے چلا سکتے ہیں جب کہ GCP میں دور سے لاگ ان ہوں۔
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
تاہم، ہم اسے ڈیٹا فلو کا استعمال کرتے ہوئے چلانے جا رہے ہیں۔ ہم درج ذیل مطلوبہ پیرامیٹرز کو ترتیب دے کر نیچے دی گئی کمانڈ کا استعمال کر کے یہ کر سکتے ہیں۔
project
- آپ کے GCP پروجیکٹ کی ID۔runner
ایک پائپ لائن رنر ہے جو آپ کے پروگرام کا تجزیہ کرے گا اور آپ کی پائپ لائن تعمیر کرے گا۔ کلاؤڈ میں چلانے کے لیے، آپ کو ڈیٹا فلو رنر کی وضاحت کرنی ہوگی۔staging_location
- کلاؤڈ ڈیٹا فلو کلاؤڈ اسٹوریج کا راستہ انڈیکسنگ کوڈ پیکجوں کے لیے جو کام کرنے والے پروسیسرز کو درکار ہے۔temp_location
- پائپ لائن کے چلنے کے دوران تخلیق کردہ عارضی ملازمت کی فائلوں کو ذخیرہ کرنے کے لیے کلاؤڈ ڈیٹا فلو کلاؤڈ اسٹوریج کا راستہ۔streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
جب یہ کمانڈ چل رہی ہو، ہم گوگل کنسول میں ڈیٹا فلو ٹیب پر جا کر اپنی پائپ لائن دیکھ سکتے ہیں۔ جب ہم پائپ لائن پر کلک کرتے ہیں، تو ہمیں شکل 4 سے ملتا جلتا کچھ نظر آنا چاہیے۔ ڈیبگنگ کے مقاصد کے لیے، تفصیلی لاگز دیکھنے کے لیے Logs اور پھر Stackdriver پر جانا بہت مددگار ثابت ہو سکتا ہے۔ اس سے مجھے کئی معاملات میں پائپ لائن کے مسائل حل کرنے میں مدد ملی ہے۔
شکل 4: بیم کنویئر
BigQuery میں ہمارے ڈیٹا تک رسائی حاصل کریں۔
لہذا، ہمارے پاس پہلے سے ہی ایک پائپ لائن چلنی چاہئے جس میں ڈیٹا ہمارے ٹیبل میں بہہ رہا ہو۔ اس کی جانچ کرنے کے لیے، ہم BigQuery پر جا کر ڈیٹا کو دیکھ سکتے ہیں۔ نیچے دی گئی کمانڈ کو استعمال کرنے کے بعد آپ کو ڈیٹاسیٹ کی پہلی چند قطاریں نظر آنی چاہئیں۔ اب جبکہ ہمارے پاس BigQuery میں ڈیٹا محفوظ ہے، ہم مزید تجزیہ کرنے کے ساتھ ساتھ ساتھیوں کے ساتھ ڈیٹا کا اشتراک کر سکتے ہیں اور کاروباری سوالات کے جوابات دینا شروع کر سکتے ہیں۔
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
شکل 5: BigQuery
حاصل يہ ہوا
ہم امید کرتے ہیں کہ یہ پوسٹ اسٹریمنگ ڈیٹا پائپ لائن بنانے کے ساتھ ساتھ ڈیٹا کو مزید قابل رسائی بنانے کے طریقے تلاش کرنے کی ایک مفید مثال کے طور پر کام کرے گی۔ اس فارمیٹ میں ڈیٹا کو ذخیرہ کرنے سے ہمیں بہت سے فوائد حاصل ہوتے ہیں۔ اب ہم اہم سوالات کے جوابات دینا شروع کر سکتے ہیں جیسے کہ کتنے لوگ ہماری پروڈکٹ استعمال کرتے ہیں؟ کیا آپ کا صارف کی بنیاد وقت کے ساتھ بڑھ رہی ہے؟ مصنوعات کے کن پہلوؤں کے ساتھ لوگ سب سے زیادہ تعامل کرتے ہیں؟ اور کیا ایسی غلطیاں ہیں جہاں نہیں ہونی چاہئیں؟ یہ وہ سوالات ہیں جو ادارے کے لیے دلچسپی کا باعث ہوں گے۔ ان سوالات کے جوابات سے ابھرنے والی بصیرت کی بنیاد پر، ہم پروڈکٹ کو بہتر بنا سکتے ہیں اور صارف کی مصروفیت کو بڑھا سکتے ہیں۔
بیم اس قسم کی ورزش کے لیے واقعی مفید ہے اور اس کے استعمال کے کئی دوسرے دلچسپ کیسز بھی ہیں۔ مثال کے طور پر، آپ ریئل ٹائم میں اسٹاک ٹک ڈیٹا کا تجزیہ کرنا چاہتے ہیں اور تجزیے کی بنیاد پر تجارت کرنا چاہتے ہیں، شاید آپ کے پاس گاڑیوں سے آنے والا سینسر ڈیٹا ہے اور آپ ٹریفک کی سطح کا حساب لگانا چاہتے ہیں۔ مثال کے طور پر، آپ ایک گیمنگ کمپنی بھی بن سکتے ہیں جو صارف کا ڈیٹا اکٹھا کرتی ہے اور اسے کلیدی میٹرکس کو ٹریک کرنے کے لیے ڈیش بورڈ بنانے کے لیے استعمال کرتی ہے۔ ٹھیک ہے حضرات، یہ ایک اور پوسٹ کے لیے ایک موضوع ہے، پڑھنے کا شکریہ، اور جو مکمل کوڈ دیکھنا چاہتے ہیں، ان کے لیے نیچے میرے GitHub کا لنک ہے۔
بس۔
ماخذ: www.habr.com