موږ د جریان ډیټا پروسس کولو پایپ لاین رامینځته کوو. برخه 2

سلام و ټولو ته. موږ د مقالې وروستۍ برخې ژباړه شریکوو، چې په ځانګړې توګه د کورس د زده کونکو لپاره چمتو شوې. د معلوماتو انجنیر. تاسو کولی شئ لومړی برخه ولولئ دلته.

د ریښتیني وخت پایپ لاینونو لپاره اپاچی بیم او ډیټا فلو

موږ د جریان ډیټا پروسس کولو پایپ لاین رامینځته کوو. برخه 2

د ګوګل کلاوډ تنظیم کول

یادونه: ما د پایپ لاین چلولو او د دودیز لاګ ډیټا خپرولو لپاره د ګوګل کلاوډ شیل کارولی و ځکه چې زه په پایپون 3 کې د پایپ لاین په چلولو کې ستونزه درلوده. د ګوګل کلاوډ شیل Python 2 کاروي ، کوم چې د اپاچي بیم سره ډیر مطابقت لري.

د پایپ لاین پیل کولو لپاره، موږ اړتیا لرو چې په ترتیباتو کې لږ څه وخورئ. ستاسو د هغو کسانو لپاره چې مخکې یې GCP نه دی کارولی، تاسو به اړتیا ولرئ لاندې 6 مرحلې تعقیب کړئ چې پدې کې تشریح شوي پاڼه.

له دې وروسته ، موږ به اړتیا ولرو چې خپل سکریپټونه د ګوګل کلاوډ ذخیره کې اپلوډ کړو او زموږ د ګوګل کلاوډ شیل ته یې کاپي کړو. د کلاوډ ذخیره ته اپلوډ کول خورا کوچني دي (یو توضیح موندل کیدی شي دلته). زموږ د فایلونو کاپي کولو لپاره ، موږ کولی شو د ګوګل کلاوډ شیل د وسیلې بار څخه په لاندې شکل 2 کې کیڼ اړخ ته په لومړي عکس کلیک کولو سره خلاص کړو.

موږ د جریان ډیټا پروسس کولو پایپ لاین رامینځته کوو. برخه 2
2 انځور

هغه قوماندې چې موږ ورته اړتیا لرو د فایلونو کاپي کولو او د اړتیا وړ کتابتونونو نصبولو لپاره لاندې لیست شوي دي.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

زموږ ډیټابیس او میز جوړول

یوځل چې موږ ټول د تنظیم کولو اړوند مرحلې بشپړې کړې ، بل شی چې موږ یې کولو ته اړتیا لرو په BigQuery کې ډیټاسیټ او جدول رامینځته کول دي. د دې کولو لپاره ډیری لارې شتون لري، مګر ترټولو ساده یې د ګوګل کلاوډ کنسول کارول دي چې لومړی د ډیټاسیټ په جوړولو سره. تاسو کولی شئ لاندې ګامونه تعقیب کړئ مخونهد سکیما سره یو میز جوړ کړئ. زموږ میز به ولري ۷ کالمونه، د هر کارن لاګ اجزاو سره مطابقت لري. د اسانتیا لپاره، موږ به ټول کالمونه د تارونو په توګه تعریف کړو، پرته له دې چې د وخت ځایي متغیر، او د هغو متغیرونو سره سم نوم ورکړو چې موږ مخکې جوړ کړي. زموږ د میز ترتیب باید په 3 شکل کې ښکاري.

موږ د جریان ډیټا پروسس کولو پایپ لاین رامینځته کوو. برخه 2
شکل 3. د جدول ترتیب

د کارن لاګ ډاټا خپرول

Pub/Sub زموږ د پایپ لاین یوه مهمه برخه ده ځکه چې دا ډیری خپلواک غوښتنلیکونو ته اجازه ورکوي چې له یو بل سره اړیکه ونیسي. په ځانګړې توګه، دا د منځګړیتوب په توګه کار کوي چې موږ ته اجازه راکوي چې د غوښتنلیکونو ترمنځ پیغامونه واستوو او ترلاسه کړو. لومړی شی چې موږ یې کولو ته اړتیا لرو یوه موضوع رامینځته کول دي. په ساده ډول په کنسول کې Pub/Sub ته لاړ شئ او د موضوع رامینځته کولو کلیک وکړئ.

لاندې کوډ زموږ سکریپټ ته زنګ وهي ترڅو پورته تعریف شوي د لاګ ډیټا تولید کړي او بیا لاګونه Pub/Sub ته وصل او لیږي. یوازینی شی چې موږ یې کولو ته اړتیا لرو یو شی جوړ کړئ PublisherClientد میتود په کارولو سره موضوع ته لاره مشخص کړئ topic_path او فنکشن ته زنګ ووهئ publish с topic_path او ډاټا. مهرباني وکړئ په یاد ولرئ چې موږ واردوو generate_log_line زموږ له سکریپټ څخه stream_logsنو ډاډ ترلاسه کړئ چې دا فایلونه په ورته فولډر کې دي، که نه نو تاسو به د وارداتو تېروتنه ترلاسه کړئ. بیا موږ کولی شو دا زموږ د ګوګل کنسول په کارولو سره پرمخ یوسو:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

هرڅومره ژر چې فایل چلیږي ، موږ به وکولی شو کنسول ته د لاګ ډیټا محصول وګورو ، لکه څنګه چې لاندې عکس کې ښودل شوي. دا سکریپټ به تر هغه وخته کار وکړي چې موږ یې ونه کاروو CTRL + Cد بشپړولو لپاره.

موږ د جریان ډیټا پروسس کولو پایپ لاین رامینځته کوو. برخه 2
شکل 4. محصول publish_logs.py

زموږ د پایپ لاین کوډ لیکل

اوس چې موږ هرڅه چمتو کړي، موږ کولی شو د ساتیرۍ برخه پیل کړو - د بیم او پایتون په کارولو سره زموږ پایپ لاین کوډ کول. د بیم پایپ لاین رامینځته کولو لپاره ، موږ اړتیا لرو د پایپ لاین څیز جوړ کړو (p). یوځل چې موږ د پایپ لاین څیز رامینځته کړو ، موږ کولی شو د آپریټر په کارولو سره یو له بل وروسته ډیری دندې پلي کړو pipe (|). په عموم کې ، د کار جریان د لاندې عکس په څیر ښکاري.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

زموږ په کوډ کې، موږ به دوه دودیز فعالیتونه جوړ کړو. فعالیت regex_clean، کوم چې ډاټا سکین کوي ​​او د فعالیت په کارولو سره د PATTERNS لیست پراساس اړوند قطار بیرته ترلاسه کوي re.search. فنکشن د کوما جلا شوي تار راګرځوي. که تاسو د منظم بیان متخصص نه یاست، زه وړاندیز کوم چې دا وګورئ ښوونه او د کوډ چیک کولو لپاره په نوټ پیډ کې تمرین وکړئ. له دې وروسته موږ د دودیز ParDo فنکشن تعریف کوو چې نومیږي وېشل شوى، کوم چې د موازي پروسس لپاره د بیم بدلون بدلون دی. په Python کې، دا په ځانګړي ډول ترسره کیږي - موږ باید یو ټولګی جوړ کړو چې د DoFn بیم ټولګي څخه په میراث کې وي. د سپلیټ فنکشن د تیرو فنکشن څخه پارس شوی قطار اخلي او زموږ د BigQuery جدول کې د کالم نومونو سره ورته کیلي سره د لغتونو لیست راګرځوي. د دې فنکشن په اړه د یادولو لپاره یو څه شتون لري: زه باید وارد کړم datetime د کار کولو لپاره د فنکشن دننه. زه د فایل په پیل کې د وارداتو تېروتنه ترلاسه کوم، کوم چې عجیب و. دا لیست بیا فعالیت ته لیږدول کیږي ولیکئToBigQuery، کوم چې په ساده ډول زموږ ډاټا میز ته اضافه کوي. د بیچ ډیټا فلو دندې او سټیمینګ ډیټا فلو دندې لپاره کوډ لاندې ورکړل شوی. د بیچ او سټرینګ کوډ ترمینځ یوازینی توپیر دا دی چې په بیچ کې موږ له CSV څخه لوستل کوو src_pathد فنکشن په کارولو سره ReadFromText له بیم څخه.

د بست ډیټا فلو دنده (د بیچ پروسس کول)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

د سټریمینګ ډیټا فلو دنده (د جریان پروسس کول)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

د لیږدونکي پیل کول

موږ کولی شو پایپ لاین په بیلابیلو لارو پرمخ یوسو. که موږ وغواړو، موږ کولی شو دا یوازې په ځایی توګه له ټرمینل څخه پرمخ یوسو پداسې حال کې چې GCP ته له لرې څخه ننوتل.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

په هرصورت، موږ به دا د DataFlow په کارولو سره پرمخ وړو. موږ کولی شو دا د لاندې کمانډ په کارولو سره د لاندې اړین پیرامیټونو په ترتیبولو سره ترسره کړو.

  • project - ستاسو د GCP پروژې ID.
  • runner د پایپ لاین چلونکی دی چې ستاسو برنامه به تحلیل کړي او ستاسو پایپ لاین به رامینځته کړي. په بادل کې د چلولو لپاره، تاسو باید د ډاټا فلو رنر مشخص کړئ.
  • staging_location - د کلاوډ ډیټا فلو کلاوډ ذخیره کولو لاره د کوډ کڅوړو شاخص کولو لپاره د پروسس کونکو لخوا چې کار ترسره کولو ته اړتیا لري.
  • temp_location - د پایپ لاین د چلولو پرمهال رامینځته شوي لنډمهاله دندې فایلونو ذخیره کولو لپاره د کلاوډ ډیټا فلو کلاوډ ذخیره ته لاره.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

پداسې حال کې چې دا قومانده روانه ده، موږ کولی شو د ګوګل کنسول کې د ډیټا فلو ټب ته لاړ شو او زموږ پایپ لاین وګورو. کله چې موږ په پایپ لاین کلیک کوو، موږ باید د 4 شکل ته ورته یو څه وګورو. د ډیبګ کولو موخو لپاره، دا خورا ګټور وي چې لاګ ته لاړ شئ او بیا Stackdriver ته لاړ شئ ترڅو تفصيلي لاګ وګورئ. دې ما سره په یو شمیر قضیو کې د پایپ لاین مسلو حل کولو کې مرسته کړې.

موږ د جریان ډیټا پروسس کولو پایپ لاین رامینځته کوو. برخه 2
شکل 4: د بیم لیږدونکی

په BigQuery کې زموږ معلوماتو ته لاسرسی ومومئ

نو، موږ باید دمخه یو پایپ لاین ولرو چې زموږ میز ته د ډیټا جریان لري. د دې ازموینې لپاره، موږ کولی شو BigQuery ته لاړ شو او ډاټا وګورو. د لاندې کمانډ کارولو وروسته تاسو باید د ډیټاسیټ لومړني څو قطارونه وګورئ. اوس چې موږ په BigQuery کې زیرمه شوي ډاټا لرو، موږ کولی شو نور تحلیل ترسره کړو، همدارنګه معلومات د همکارانو سره شریک کړو او د سوداګرۍ پوښتنو ته ځوابونه پیل کړو.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

موږ د جریان ډیټا پروسس کولو پایپ لاین رامینځته کوو. برخه 2
شکل 5: BigQuery

پایلې

موږ امید لرو چې دا پوسټ د سټیمینګ ډیټا پایپ لاین رامینځته کولو ګټور مثال په توګه کار کوي ، په بیله بیا ډیټا ته د لاسرسي وړ کولو لارې موندلو. پدې شکل کې د معلوماتو ذخیره کول موږ ته ډیری ګټې راکوي. اوس موږ کولی شو مهم پوښتنو ته ځواب ووایو لکه څومره خلک زموږ محصول کاروي؟ ایا ستاسو د کاروونکي اساس د وخت په تیریدو سره وده کوي؟ د محصول کوم اړخونه خلک تر ټولو ډیر متقابل عمل کوي؟ او آیا داسې تېروتنې شته چې باید نه وي؟ دا هغه پوښتنې دي چې د سازمان لپاره به په زړه پورې وي. د هغو بصیرتونو پراساس چې د دې پوښتنو له ځوابونو څخه راپورته کیږي، موږ کولی شو محصول ته وده ورکړو او د کاروونکي ښکیلتیا زیاته کړو.

بیم د دې ډول تمرین لپاره واقعیا ګټور دی او د کارونې یو شمیر نورې په زړه پورې قضیې هم لري. د مثال په توګه، تاسو ممکن په ریښتیني وخت کې د سټاک ټک ډیټا تحلیل کړئ او د تحلیل پراساس تجارت وکړئ، شاید تاسو د سینسر ډاټا لرئ چې د وسایطو څخه راځي او غواړئ د ټرافیک کچه محاسبه کړئ. تاسو کولی شئ د مثال په توګه، د لوبې کولو شرکت وي چې د کاروونکي ډاټا راټولوي او د کلیدي میټریکونو تعقیب لپاره د ډشبورډونو جوړولو لپاره کاروي. ښه، ښاغلو، دا د بل پوسټ لپاره موضوع ده، د لوستلو لپاره مننه، او د هغو کسانو لپاره چې غواړي بشپړ کوډ وګوري، لاندې زما د GitHub لینک دی.

https://github.com/DFoly/User_log_pipeline

بس. لومړۍ برخه ولولئ.

سرچینه: www.habr.com

Add a comment