هيلو سڀ. اسان مضمون جي آخري حصي جو ترجمو شيئر ڪري رهيا آهيون، خاص طور تي ڪورس جي شاگردن لاءِ تيار ڪيو ويو آهي.
اپاچي بيام ۽ ڊيٽا فلو ريئل ٽائيم پائپ لائنز لاءِ
Google Cloud سيٽ اپ ڪريو
نوٽ: مون پائپ لائن کي هلائڻ ۽ ڪسٽم لاگ ڊيٽا کي شايع ڪرڻ لاءِ گوگل ڪلائوڊ شيل استعمال ڪيو ڇاڪاڻ ته مون کي پٿون 3 ۾ پائپ لائن هلائڻ ۾ ڏکيائي ٿي رهي هئي. گوگل ڪلائوڊ شيل پائٿون 2 استعمال ڪري ٿو، جيڪو Apache Beam سان وڌيڪ مطابقت رکي ٿو.
پائپ لائن کي شروع ڪرڻ لاء، اسان کي سيٽنگون ۾ ٿورڙو کڙو ڪرڻو پوندو. توھان مان انھن لاءِ جن اڳ ۾ GCP استعمال نه ڪيو آھي، توھان کي ھيٺ ڏنل 6 مرحلن تي عمل ڪرڻو پوندو ھن ۾ بيان ڪيل
ان کان پوء، اسان کي اسان جي اسڪرپٽ کي گوگل ڪلائوڊ اسٽوريج تي اپلوڊ ڪرڻ جي ضرورت پوندي ۽ انهن کي اسان جي گوگل ڪلائوڊ شيل تي نقل ڪرڻو پوندو. ڪلائوڊ اسٽوريج تي اپ لوڊ ڪرڻ تمام معمولي آهي (هڪ وضاحت ملي سگهي ٿي
2 شڪل
اهي حڪم جيڪي اسان کي فائلن کي نقل ڪرڻ ۽ گهربل لائبريرين کي انسٽال ڪرڻ جي ضرورت آهي هيٺ ڏنل فهرست ڏنل آهن.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
اسان جي ڊيٽابيس ۽ ٽيبل ٺاهڻ
هڪ دفعو اسان سيٽ اپ سان لاڳاپيل سڀئي مرحلا مڪمل ڪري چڪا آهيون، ايندڙ شيء جيڪا اسان کي ڪرڻ جي ضرورت آهي BigQuery ۾ هڪ ڊيٽا سيٽ ۽ ٽيبل ٺاهيو. ھن کي ڪرڻ جا ڪيترائي طريقا آھن، پر سڀ کان سادو آھي گوگل ڪلائوڊ ڪنسول استعمال ڪرڻ لاءِ پهريون ڀيرو ڊيٽا سيٽ ٺاھيو. توھان ھيٺ ڏنل قدمن تي عمل ڪري سگھو ٿا
شڪل 3. ٽيبل جي ترتيب
استعمال ڪندڙ لاگ ڊيٽا شايع ڪرڻ
Pub/Sub اسان جي پائپ لائن جو هڪ نازڪ حصو آهي ڇاڪاڻ ته اها ڪيترن ئي آزاد ايپليڪيشنن کي هڪ ٻئي سان رابطو ڪرڻ جي اجازت ڏئي ٿي. خاص طور تي، اهو هڪ وچولي طور ڪم ڪري ٿو جيڪو اسان کي ايپليڪيشنن جي وچ ۾ پيغام موڪلڻ ۽ وصول ڪرڻ جي اجازت ڏئي ٿو. پهرين شيء اسان کي ڪرڻ جي ضرورت آهي هڪ موضوع ٺاهي. بس ڪنسول ۾ پب/سب ڏانھن وڃو ۽ موضوع ٺاھيو تي ڪلڪ ڪريو.
هيٺ ڏنل ڪوڊ اسان جي اسڪرپٽ کي سڏي ٿو مٿي بيان ڪيل لاگ ڊيٽا پيدا ڪرڻ لاءِ ۽ پوءِ ڳنڍي ٿو ۽ لاگس کي Pub/Sub ڏانهن موڪلي ٿو. اسان کي صرف هڪ شيء ٺاهڻ جي ضرورت آهي هڪ اعتراض پبلشر ڪلائنٽ، طريقو استعمال ڪندي موضوع ڏانهن رستو بيان ڪريو topic_path
۽ فنڪشن کي ڪال ڪريو publish
с topic_path
۽ ڊيٽا. مهرباني ڪري نوٽ ڪريو ته اسان درآمد ڪريون ٿا generate_log_line
اسان جي اسڪرپٽ مان stream_logs
، تنهن ڪري پڪ ڪريو ته اهي فائلون ساڳئي فولڊر ۾ آهن، ٻي صورت ۾ توهان کي درآمد جي غلطي ملندي. ان کان پوء اسان هن کي اسان جي گوگل ڪنسول ذريعي هلائي سگهون ٿا:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
جيئن ئي فائل هلندي، اسان لاگ ڊيٽا جي آئوٽ پٽ کي ڪنسول ڏانهن ڏسي سگهنداسين، جيئن هيٺ ڏنل شڪل ۾ ڏيکاريل آهي. هي اسڪرپٽ ڪم ڪندو جيستائين اسان استعمال نٿا ڪريون CTRL + سيان کي مڪمل ڪرڻ لاء.
شڪل 4. آئوٽ پٽ publish_logs.py
اسان جي پائپ لائن ڪوڊ لکڻ
ھاڻي ته اسان وٽ سڀ ڪجھ تيار آھي، اسان شروع ڪري سگھون ٿا مزي وارو حصو - بيم ۽ پٿون استعمال ڪندي اسان جي پائپ لائن کي ڪوڊ ڪرڻ. بيام پائيپ لائين ٺاھڻ لاءِ، اسان کي ھڪڙي پائپ لائن اعتراض ٺاھڻ جي ضرورت آھي (p). هڪ دفعو اسان هڪ پائپ لائن اعتراض ٺاهي ڇڏيو آهي، اسان آپريٽر کي استعمال ڪندي هڪ کان پوء هڪ کان وڌيڪ افعال لاڳو ڪري سگهون ٿا pipe (|)
. عام طور تي، ڪم فلو هيٺ ڏنل تصوير وانگر ڏسڻ ۾ اچي ٿو.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
اسان جي ڪوڊ ۾، اسان ٻه ڪسٽم فنڪشن ٺاهينداسين. فنڪشن regex_clean
، جيڪو ڊيٽا کي اسڪين ڪري ٿو ۽ ساڳئي قطار کي ٻيهر حاصل ڪري ٿو PATTERNS لسٽ جي بنياد تي فنڪشن استعمال ڪندي re.search
. فنڪشن هڪ ڪاما کان الڳ ٿيل اسٽرنگ واپس ڪري ٿو. جيڪڏهن توهان باقاعده اظهار جو ماهر نه آهيو، آئون هن کي جانچڻ جي صلاح ڏيان ٿو datetime
ان کي ڪم ڪرڻ لاء هڪ فنڪشن اندر. مون کي فائل جي شروعات ۾ هڪ درآمد جي غلطي ملي رهي هئي، جيڪا عجيب هئي. اها فهرست پوءِ فنڪشن ڏانهن منتقل ڪئي وئي آهي لکو ToBigQuery، جيڪو صرف اسان جي ڊيٽا کي ٽيبل تي شامل ڪري ٿو. بيچ DataFlow جاب ۽ اسٽريمنگ DataFlow جاب لاءِ ڪوڊ ھيٺ ڏنل آھي. بيچ ۽ اسٽريمنگ ڪوڊ جي وچ ۾ فرق صرف اهو آهي ته بيچ ۾ اسان CSV کان پڙهون ٿا src_path
فنڪشن کي استعمال ڪندي ReadFromText
بيام کان.
بيچ ڊيٽا فلو نوڪري (بيچ پروسيسنگ)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
اسٽريمنگ ڊيٽا فلو جاب (اسٽريمنگ پروسيسنگ)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
ڪنويئر کي شروع ڪندي
اسان پائپ لائن کي ڪيترن ئي مختلف طريقن سان هلائي سگهون ٿا. جيڪڏهن اسان چاهيون ٿا، اسان صرف ان کي مقامي طور تي هلائي سگهون ٿا ٽرمينل کان جڏهن GCP ۾ لاگ ان ٿيڻ دوران ريموٽ.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
بهرحال، اسان ان کي هلائڻ وارا آهيون DataFlow استعمال ڪندي. اسان هي ڪم ڪري سگھون ٿا هيٺ ڏنل ڪمانڊ کي استعمال ڪندي هيٺين گهربل پيرا ميٽرس کي ترتيب ڏيندي.
project
- توهان جي GCP پروجيڪٽ جي سڃاڻپ.runner
هڪ پائپ لائن رنر آهي جيڪو توهان جي پروگرام جو تجزيو ڪندو ۽ توهان جي پائپ لائن کي تعمير ڪندو. ڪلائوڊ ۾ هلائڻ لاءِ، توھان کي بيان ڪرڻ گھرجي DataflowRunner.staging_location
- Cloud Dataflow ڪلائوڊ اسٽوريج جو رستو انڊيڪسنگ ڪوڊ پيڪيجز لاءِ گهربل پروسيسرز لاءِ ڪم ڪري رهيا آهن.temp_location
- ڪلائوڊ ڊيٽا فلو ڪلائوڊ اسٽوريج ڏانهن رستو عارضي نوڪري فائلن کي محفوظ ڪرڻ لاءِ ٺاهي وئي جڏهن پائپ لائن هلي رهي آهي.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
جڏهن هي حڪم هلي رهيو آهي، اسان گوگل ڪنسول ۾ DataFlow ٽيب ڏانهن وڃو ۽ اسان جي پائپ لائن کي ڏسي سگهون ٿا. جڏهن اسان پائپ لائن تي ڪلڪ ڪندا آهيون، اسان کي شڪل 4 جهڙي ڪا شيءِ ڏسڻ گهرجي. ڊيبگنگ جي مقصدن لاءِ، تفصيلي لاگز کي ڏسڻ لاءِ لاگز ۽ پوءِ اسٽيڪ ڊرائيور ڏانهن وڃڻ تمام مددگار ثابت ٿي سگهي ٿو. هن مون کي ڪيترن ئي ڪيسن ۾ پائپ لائن مسئلن کي حل ڪرڻ ۾ مدد ڪئي آهي.
شڪل 4: بيم کنويئر
BigQuery ۾ اسان جي ڊيٽا تائين رسائي ڪريو
تنهن ڪري، اسان کي اڳ ۾ ئي هڪ پائپ لائن هجڻ گهرجي جيڪا اسان جي ٽيبل ۾ وهندي ڊيٽا سان گڏ. ھن کي جانچڻ لاءِ، اسان وڃي سگھون ٿا BigQuery ۽ ڊيٽا کي ڏسو. ھيٺ ڏنل حڪم استعمال ڪرڻ کان پوء توھان کي ڏسڻ گھرجي ڊيٽا سيٽ جون پھريون ڪجھ قطارون. ھاڻي جڏھن ته اسان وٽ ڊيٽا آھي BigQuery ۾ ذخيرو ٿيل آھي، اسان وڌيڪ تجزيو ڪري سگھون ٿا، گڏوگڏ ڊيٽا کي ساٿين سان شيئر ڪري سگھون ٿا ۽ ڪاروباري سوالن جا جواب ڏيڻ شروع ڪري سگھون ٿا.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
شڪل 5: BigQuery
ٿڪل
اسان کي اميد آهي ته هي پوسٽ هڪ اسٽريمنگ ڊيٽا پائپ لائن ٺاهڻ جي هڪ ڪارائتي مثال طور ڪم ڪري ٿي، انهي سان گڏ ڊيٽا کي وڌيڪ رسائي حاصل ڪرڻ جا طريقا ڳولڻ. ھن فارميٽ ۾ ڊيٽا کي ذخيرو ڪرڻ اسان کي ڪيترائي فائدا ڏئي ٿو. هاڻي اسان اهم سوالن جا جواب ڏيڻ شروع ڪري سگهون ٿا جهڙوڪ ڪيترا ماڻهو اسان جي پيداوار کي استعمال ڪندا آهن؟ ڇا توهان جو صارف بنياد وقت سان وڌي رهيو آهي؟ پراڊڪٽ جا ڪهڙا پهلو آهن جن سان ماڻهو سڀ کان وڌيڪ لهه وچڙ ڪندا آهن؟ ۽ ڇا اهڙا غلطيون آهن جتي نه هجڻ گهرجي؟ اهي اهي سوال آهن جيڪي تنظيم جي دلچسپيءَ جا هوندا. انهن بصيرت جي بنياد تي جيڪي انهن سوالن جي جوابن مان نڪرندا آهن، اسان پيداوار کي بهتر ڪري سگهون ٿا ۽ صارف جي مصروفيت کي وڌائي سگهون ٿا.
بيم واقعي هن قسم جي مشق لاءِ مفيد آهي ۽ ان سان گڏ ٻيا به ڪيترائي دلچسپ استعمال جا ڪيس آهن. مثال طور، توهان حقيقي وقت ۾ اسٽاڪ ٽڪ ڊيٽا جو تجزيو ڪرڻ چاهيو ٿا ۽ تجزيي جي بنياد تي واپار ڪرڻ چاهيو ٿا، شايد توهان وٽ سينسر ڊيٽا آهي جيڪو گاڏين مان اچي رهيو آهي ۽ ٽرئفڪ جي سطح جي حساب سان حساب ڪرڻ چاهيندا. توهان شايد، مثال طور، هڪ گیمنگ ڪمپني ٿي سگهي ٿي جيڪا صارف جي ڊيٽا گڏ ڪري ٿي ۽ ان کي استعمال ڪندي ڊيش بورڊ ٺاهڻ لاءِ اهم ميٽرڪ کي ٽريڪ ڪرڻ لاءِ. ٺيڪ آهي، حضرات، هي هڪ ٻئي پوسٽ لاءِ هڪ موضوع آهي، پڙهڻ لاءِ مهرباني، ۽ انهن لاءِ جيڪي مڪمل ڪوڊ ڏسڻ چاهين ٿا، هيٺ ڏنل لنڪ آهي منهنجي GitHub جي.
اهو سڀ ڪجهه آهي.
جو ذريعو: www.habr.com