اسان هڪ وهڪرو ڊيٽا پروسيسنگ پائپ لائن ٺاهيندا آهيون. حصو 2

هيلو سڀ. اسان مضمون جي آخري حصي جو ترجمو شيئر ڪري رهيا آهيون، خاص طور تي ڪورس جي شاگردن لاءِ تيار ڪيو ويو آهي. ڊيٽا انجنيئر. توهان پڙهي سگهو ٿا پهريون حصو هتي.

اپاچي بيام ۽ ڊيٽا فلو ريئل ٽائيم پائپ لائنز لاءِ

اسان هڪ وهڪرو ڊيٽا پروسيسنگ پائپ لائن ٺاهيندا آهيون. حصو 2

Google Cloud سيٽ اپ ڪريو

نوٽ: مون پائپ لائن کي هلائڻ ۽ ڪسٽم لاگ ڊيٽا کي شايع ڪرڻ لاءِ گوگل ڪلائوڊ شيل استعمال ڪيو ڇاڪاڻ ته مون کي پٿون 3 ۾ پائپ لائن هلائڻ ۾ ڏکيائي ٿي رهي هئي. گوگل ڪلائوڊ شيل پائٿون 2 استعمال ڪري ٿو، جيڪو Apache Beam سان وڌيڪ مطابقت رکي ٿو.

پائپ لائن کي شروع ڪرڻ لاء، اسان کي سيٽنگون ۾ ٿورڙو کڙو ڪرڻو پوندو. توھان مان انھن لاءِ جن اڳ ۾ GCP استعمال نه ڪيو آھي، توھان کي ھيٺ ڏنل 6 مرحلن تي عمل ڪرڻو پوندو ھن ۾ بيان ڪيل صفحو.

ان کان پوء، اسان کي اسان جي اسڪرپٽ کي گوگل ڪلائوڊ اسٽوريج تي اپلوڊ ڪرڻ جي ضرورت پوندي ۽ انهن کي اسان جي گوگل ڪلائوڊ شيل تي نقل ڪرڻو پوندو. ڪلائوڊ اسٽوريج تي اپ لوڊ ڪرڻ تمام معمولي آهي (هڪ وضاحت ملي سگهي ٿي هتي). اسان جون فائلون ڪاپي ڪرڻ لاءِ، اسان ٽول بار مان گوگل ڪلائوڊ شيل کولي سگھون ٿا ھيٺ ڏنل شڪل 2 ۾ کاٻي پاسي واري پهرين آئڪن تي ڪلڪ ڪري.

اسان هڪ وهڪرو ڊيٽا پروسيسنگ پائپ لائن ٺاهيندا آهيون. حصو 2
2 شڪل

اهي حڪم جيڪي اسان کي فائلن کي نقل ڪرڻ ۽ گهربل لائبريرين کي انسٽال ڪرڻ جي ضرورت آهي هيٺ ڏنل فهرست ڏنل آهن.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

اسان جي ڊيٽابيس ۽ ٽيبل ٺاهڻ

هڪ دفعو اسان سيٽ اپ سان لاڳاپيل سڀئي مرحلا مڪمل ڪري چڪا آهيون، ايندڙ شيء جيڪا اسان کي ڪرڻ جي ضرورت آهي BigQuery ۾ هڪ ڊيٽا سيٽ ۽ ٽيبل ٺاهيو. ھن کي ڪرڻ جا ڪيترائي طريقا آھن، پر سڀ کان سادو آھي گوگل ڪلائوڊ ڪنسول استعمال ڪرڻ لاءِ پهريون ڀيرو ڊيٽا سيٽ ٺاھيو. توھان ھيٺ ڏنل قدمن تي عمل ڪري سگھو ٿا لنڪاسڪيما سان ٽيبل ٺاهڻ لاءِ. اسان جي ٽيبل هوندي 7 ڪالمن، هر يوزر لاگ جي اجزاء سان ملندڙ. سھولت لاءِ، اسين سڀ ڪالمن کي اسٽرنگ طور بيان ڪنداسين، سواءِ وقت جي مقامي متغير جي، ۽ انھن کي نالو ڏينداسين انھن متغيرن جي مطابق جيڪي اسان اڳ ۾ ٺاھيون. اسان جي ٽيبل جي ترتيب کي شڪل 3 ۾ ڏسڻ گهرجي.

اسان هڪ وهڪرو ڊيٽا پروسيسنگ پائپ لائن ٺاهيندا آهيون. حصو 2
شڪل 3. ٽيبل جي ترتيب

استعمال ڪندڙ لاگ ڊيٽا شايع ڪرڻ

Pub/Sub اسان جي پائپ لائن جو هڪ نازڪ حصو آهي ڇاڪاڻ ته اها ڪيترن ئي آزاد ايپليڪيشنن کي هڪ ٻئي سان رابطو ڪرڻ جي اجازت ڏئي ٿي. خاص طور تي، اهو هڪ وچولي طور ڪم ڪري ٿو جيڪو اسان کي ايپليڪيشنن جي وچ ۾ پيغام موڪلڻ ۽ وصول ڪرڻ جي اجازت ڏئي ٿو. پهرين شيء اسان کي ڪرڻ جي ضرورت آهي هڪ موضوع ٺاهي. بس ڪنسول ۾ پب/سب ڏانھن وڃو ۽ موضوع ٺاھيو تي ڪلڪ ڪريو.

هيٺ ڏنل ڪوڊ اسان جي اسڪرپٽ کي سڏي ٿو مٿي بيان ڪيل لاگ ڊيٽا پيدا ڪرڻ لاءِ ۽ پوءِ ڳنڍي ٿو ۽ لاگس کي Pub/Sub ڏانهن موڪلي ٿو. اسان کي صرف هڪ شيء ٺاهڻ جي ضرورت آهي هڪ اعتراض پبلشر ڪلائنٽ، طريقو استعمال ڪندي موضوع ڏانهن رستو بيان ڪريو topic_path ۽ فنڪشن کي ڪال ڪريو publish с topic_path ۽ ڊيٽا. مهرباني ڪري نوٽ ڪريو ته اسان درآمد ڪريون ٿا generate_log_line اسان جي اسڪرپٽ مان stream_logs، تنهن ڪري پڪ ڪريو ته اهي فائلون ساڳئي فولڊر ۾ آهن، ٻي صورت ۾ توهان کي درآمد جي غلطي ملندي. ان کان پوء اسان هن کي اسان جي گوگل ڪنسول ذريعي هلائي سگهون ٿا:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

جيئن ئي فائل هلندي، اسان لاگ ڊيٽا جي آئوٽ پٽ کي ڪنسول ڏانهن ڏسي سگهنداسين، جيئن هيٺ ڏنل شڪل ۾ ڏيکاريل آهي. هي اسڪرپٽ ڪم ڪندو جيستائين اسان استعمال نٿا ڪريون CTRL + سيان کي مڪمل ڪرڻ لاء.

اسان هڪ وهڪرو ڊيٽا پروسيسنگ پائپ لائن ٺاهيندا آهيون. حصو 2
شڪل 4. آئوٽ پٽ publish_logs.py

اسان جي پائپ لائن ڪوڊ لکڻ

ھاڻي ته اسان وٽ سڀ ڪجھ تيار آھي، اسان شروع ڪري سگھون ٿا مزي وارو حصو - بيم ۽ پٿون استعمال ڪندي اسان جي پائپ لائن کي ڪوڊ ڪرڻ. بيام پائيپ لائين ٺاھڻ لاءِ، اسان کي ھڪڙي پائپ لائن اعتراض ٺاھڻ جي ضرورت آھي (p). هڪ دفعو اسان هڪ پائپ لائن اعتراض ٺاهي ڇڏيو آهي، اسان آپريٽر کي استعمال ڪندي هڪ کان پوء هڪ کان وڌيڪ افعال لاڳو ڪري سگهون ٿا pipe (|). عام طور تي، ڪم فلو هيٺ ڏنل تصوير وانگر ڏسڻ ۾ اچي ٿو.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

اسان جي ڪوڊ ۾، اسان ٻه ڪسٽم فنڪشن ٺاهينداسين. فنڪشن regex_clean، جيڪو ڊيٽا کي اسڪين ڪري ٿو ۽ ساڳئي قطار کي ٻيهر حاصل ڪري ٿو PATTERNS لسٽ جي بنياد تي فنڪشن استعمال ڪندي re.search. فنڪشن هڪ ڪاما کان الڳ ٿيل اسٽرنگ واپس ڪري ٿو. جيڪڏهن توهان باقاعده اظهار جو ماهر نه آهيو، آئون هن کي جانچڻ جي صلاح ڏيان ٿو سبق ۽ ڪوڊ چيڪ ڪرڻ لاءِ نوٽ پيڊ ۾ مشق ڪريو. ان کان پوء اسان هڪ رواج ParDo فنڪشن کي سڏيو ويندو آهي ۾ ورهايو ويو، جيڪو متوازي پروسيسنگ لاءِ بيام ٽرانسفارم جو هڪ تغير آهي. Python ۾، اهو هڪ خاص طريقي سان ڪيو ويندو آهي - اسان کي هڪ ڪلاس ٺاهڻ گهرجي جيڪو DoFn بيام ڪلاس مان ورثي ۾ ملي ٿو. Split فنڪشن پوئين فنڪشن مان پارس ٿيل قطار کي وٺي ٿو ۽ اسان جي BigQuery ٽيبل ۾ ڪالمن جي نالن سان ملندڙ چاٻين سان لغتن جي هڪ فهرست ڏي ٿو. ھن فنڪشن بابت نوٽ ڪرڻ لاء ڪجھھ آھي: مون کي درآمد ڪرڻو پيو datetime ان کي ڪم ڪرڻ لاء هڪ فنڪشن اندر. مون کي فائل جي شروعات ۾ هڪ درآمد جي غلطي ملي رهي هئي، جيڪا عجيب هئي. اها فهرست پوءِ فنڪشن ڏانهن منتقل ڪئي وئي آهي لکو ToBigQuery، جيڪو صرف اسان جي ڊيٽا کي ٽيبل تي شامل ڪري ٿو. بيچ DataFlow جاب ۽ اسٽريمنگ DataFlow جاب لاءِ ڪوڊ ھيٺ ڏنل آھي. بيچ ۽ اسٽريمنگ ڪوڊ جي وچ ۾ فرق صرف اهو آهي ته بيچ ۾ اسان CSV کان پڙهون ٿا src_pathفنڪشن کي استعمال ڪندي ReadFromText بيام کان.

بيچ ڊيٽا فلو نوڪري (بيچ پروسيسنگ)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

اسٽريمنگ ڊيٽا فلو جاب (اسٽريمنگ پروسيسنگ)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

ڪنويئر کي شروع ڪندي

اسان پائپ لائن کي ڪيترن ئي مختلف طريقن سان هلائي سگهون ٿا. جيڪڏهن اسان چاهيون ٿا، اسان صرف ان کي مقامي طور تي هلائي سگهون ٿا ٽرمينل کان جڏهن GCP ۾ لاگ ان ٿيڻ دوران ريموٽ.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

بهرحال، اسان ان کي هلائڻ وارا آهيون DataFlow استعمال ڪندي. اسان هي ڪم ڪري سگھون ٿا هيٺ ڏنل ڪمانڊ کي استعمال ڪندي هيٺين گهربل پيرا ميٽرس کي ترتيب ڏيندي.

  • project - توهان جي GCP پروجيڪٽ جي سڃاڻپ.
  • runner هڪ پائپ لائن رنر آهي جيڪو توهان جي پروگرام جو تجزيو ڪندو ۽ توهان جي پائپ لائن کي تعمير ڪندو. ڪلائوڊ ۾ هلائڻ لاءِ، توھان کي بيان ڪرڻ گھرجي DataflowRunner.
  • staging_location - Cloud Dataflow ڪلائوڊ اسٽوريج جو رستو انڊيڪسنگ ڪوڊ پيڪيجز لاءِ گهربل پروسيسرز لاءِ ڪم ڪري رهيا آهن.
  • temp_location - ڪلائوڊ ڊيٽا فلو ڪلائوڊ اسٽوريج ڏانهن رستو عارضي نوڪري فائلن کي محفوظ ڪرڻ لاءِ ٺاهي وئي جڏهن پائپ لائن هلي رهي آهي.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

جڏهن هي حڪم هلي رهيو آهي، اسان گوگل ڪنسول ۾ DataFlow ٽيب ڏانهن وڃو ۽ اسان جي پائپ لائن کي ڏسي سگهون ٿا. جڏهن اسان پائپ لائن تي ڪلڪ ڪندا آهيون، اسان کي شڪل 4 جهڙي ڪا شيءِ ڏسڻ گهرجي. ڊيبگنگ جي مقصدن لاءِ، تفصيلي لاگز کي ڏسڻ لاءِ لاگز ۽ پوءِ اسٽيڪ ڊرائيور ڏانهن وڃڻ تمام مددگار ثابت ٿي سگهي ٿو. هن مون کي ڪيترن ئي ڪيسن ۾ پائپ لائن مسئلن کي حل ڪرڻ ۾ مدد ڪئي آهي.

اسان هڪ وهڪرو ڊيٽا پروسيسنگ پائپ لائن ٺاهيندا آهيون. حصو 2
شڪل 4: بيم کنويئر

BigQuery ۾ اسان جي ڊيٽا تائين رسائي ڪريو

تنهن ڪري، اسان کي اڳ ۾ ئي هڪ پائپ لائن هجڻ گهرجي جيڪا اسان جي ٽيبل ۾ وهندي ڊيٽا سان گڏ. ھن کي جانچڻ لاءِ، اسان وڃي سگھون ٿا BigQuery ۽ ڊيٽا کي ڏسو. ھيٺ ڏنل حڪم استعمال ڪرڻ کان پوء توھان کي ڏسڻ گھرجي ڊيٽا سيٽ جون پھريون ڪجھ قطارون. ھاڻي جڏھن ته اسان وٽ ڊيٽا آھي BigQuery ۾ ذخيرو ٿيل آھي، اسان وڌيڪ تجزيو ڪري سگھون ٿا، گڏوگڏ ڊيٽا کي ساٿين سان شيئر ڪري سگھون ٿا ۽ ڪاروباري سوالن جا جواب ڏيڻ شروع ڪري سگھون ٿا.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

اسان هڪ وهڪرو ڊيٽا پروسيسنگ پائپ لائن ٺاهيندا آهيون. حصو 2
شڪل 5: BigQuery

ٿڪل

اسان کي اميد آهي ته هي پوسٽ هڪ اسٽريمنگ ڊيٽا پائپ لائن ٺاهڻ جي هڪ ڪارائتي مثال طور ڪم ڪري ٿي، انهي سان گڏ ڊيٽا کي وڌيڪ رسائي حاصل ڪرڻ جا طريقا ڳولڻ. ھن فارميٽ ۾ ڊيٽا کي ذخيرو ڪرڻ اسان کي ڪيترائي فائدا ڏئي ٿو. هاڻي اسان اهم سوالن جا جواب ڏيڻ شروع ڪري سگهون ٿا جهڙوڪ ڪيترا ماڻهو اسان جي پيداوار کي استعمال ڪندا آهن؟ ڇا توهان جو صارف بنياد وقت سان وڌي رهيو آهي؟ پراڊڪٽ جا ڪهڙا پهلو آهن جن سان ماڻهو سڀ کان وڌيڪ لهه وچڙ ڪندا آهن؟ ۽ ڇا اهڙا غلطيون آهن جتي نه هجڻ گهرجي؟ اهي اهي سوال آهن جيڪي تنظيم جي دلچسپيءَ جا هوندا. انهن بصيرت جي بنياد تي جيڪي انهن سوالن جي جوابن مان نڪرندا آهن، اسان پيداوار کي بهتر ڪري سگهون ٿا ۽ صارف جي مصروفيت کي وڌائي سگهون ٿا.

بيم واقعي هن قسم جي مشق لاءِ مفيد آهي ۽ ان سان گڏ ٻيا به ڪيترائي دلچسپ استعمال جا ڪيس آهن. مثال طور، توهان حقيقي وقت ۾ اسٽاڪ ٽڪ ڊيٽا جو تجزيو ڪرڻ چاهيو ٿا ۽ تجزيي جي بنياد تي واپار ڪرڻ چاهيو ٿا، شايد توهان وٽ سينسر ڊيٽا آهي جيڪو گاڏين مان اچي رهيو آهي ۽ ٽرئفڪ جي سطح جي حساب سان حساب ڪرڻ چاهيندا. توهان شايد، مثال طور، هڪ گیمنگ ڪمپني ٿي سگهي ٿي جيڪا صارف جي ڊيٽا گڏ ڪري ٿي ۽ ان کي استعمال ڪندي ڊيش بورڊ ٺاهڻ لاءِ اهم ميٽرڪ کي ٽريڪ ڪرڻ لاءِ. ٺيڪ آهي، حضرات، هي هڪ ٻئي پوسٽ لاءِ هڪ موضوع آهي، پڙهڻ لاءِ مهرباني، ۽ انهن لاءِ جيڪي مڪمل ڪوڊ ڏسڻ چاهين ٿا، هيٺ ڏنل لنڪ آهي منهنجي GitHub جي.

https://github.com/DFoly/User_log_pipeline

اهو سڀ ڪجهه آهي. پڙهو حصو پهريون.

جو ذريعو: www.habr.com

تبصرو شامل ڪريو