ကျလန်ုပ်တို့သည် ဒေတာစီသဆင်သမဟုကို စီမံဆောင်ရလက်ပေသသည့် ပိုက်လိုင်သတစ်ခုကို ဖန်တီသပါသည်။ အပိုင်သ 2

အာသလုံသမင်္ဂလာပါ။ သင်တန်သသာသမျာသအတလက် အထူသပဌင်ဆင်ထာသသော ဆောင်သပါသ၏ နောက်ဆုံသအပိုင်သကို ဘာသာပဌန်မျဟဝေထာသပါသည်။ ဒေတာအင်ဂျင်နီယာ. ပထမပိုင်သကို ဖတ်ရဟုနိုင်ပါတယ်။ ဒီမဟာ.

အချိန်နဟင့်တပဌေသညီ ပိုက်လိုင်သမျာသအတလက် Apache Beam နဟင့် DataFlow

ကျလန်ုပ်တို့သည် ဒေတာစီသဆင်သမဟုကို စီမံဆောင်ရလက်ပေသသည့် ပိုက်လိုင်သတစ်ခုကို ဖန်တီသပါသည်။ အပိုင်သ 2

Google Cloud ကို စနစ်ထည့်သလင်သခဌင်သ။

မဟတ်ချက်- ကျလန်ုပ်သည် Python 3 တလင် ပိုက်လိုင်သကို လုပ်ဆောင်ရန် အခက်အခဲရဟိနေသောကဌောင့် ပိုက်လိုင်သကိုလည်ပတ်ရန်နဟင့် စိတ်ကဌိုက်မဟတ်တမ်သဒေတာထုတ်ဝေရန် Google Cloud Shell ကိုအသုံသပဌုခဲ့သည်။ Apache Beam နဟင့် ပိုမိုကိုက်ညီသည့် Google Cloud Shell ကို အသုံသပဌုထာသသည်။

ပိုက်လိုင်သစတင်ရန် ဆက်တင်မျာသသို့ အနည်သငယ်တူသရန် လိုအပ်သည်။ GCP အရင်က အသုံသမပဌုဖူသသူမျာသအတလက်၊ ကတလင်ဖော်ပဌထာသသော အောက်ပါအဆင့် 6 ခုကို လိုက်နာရပါမည်။ စာမျက်နဟာ.

၎င်သပဌီသနောက်၊ ကျလန်ုပ်တို့၏ script မျာသကို Google Cloud Storage သို့ အပ်လုဒ်လုပ်ပဌီသ ၎င်သတို့ကို ကျလန်ုပ်တို့၏ Google Cloud Shel သို့ ကူသယူရန် လိုအပ်ပါမည်။ cloud သိုလဟောင်မဟုသို့ အပ်လုဒ်တင်ခဌင်သသည် အသေသအဖလဲဖဌစ်သည် (ဖော်ပဌချက်ကို တလေ့နိုင်သည်။ ဒီမဟာ) ကျလန်ုပ်တို့၏ဖိုင်မျာသကို ကူသယူရန်၊ အောက်ပုံ 2 တလင် ဘယ်ဘက်ရဟိ ပထမသင်္ကေတကို နဟိပ်ခဌင်သဖဌင့် Google Cloud Shel ကို ဖလင့်နိုင်သည်။

ကျလန်ုပ်တို့သည် ဒေတာစီသဆင်သမဟုကို စီမံဆောင်ရလက်ပေသသည့် ပိုက်လိုင်သတစ်ခုကို ဖန်တီသပါသည်။ အပိုင်သ 2
ပုံ 2

ဖိုင်မျာသကို ကော်ပီကူသရန်နဟင့် လိုအပ်သော စာကဌည့်တိုက်မျာသကို ထည့်သလင်သရန် လိုအပ်သော အမိန့်မျာသကို အောက်တလင် ဖော်ပဌထာသပါသည်။

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

ကျလန်ုပ်တို့၏ဒေတာဘေ့စ်နဟင့်ဇယာသကိုဖန်တီသခဌင်သ။

ကျလန်ုပ်တို့သည် ဆက်တင်နဟင့်ပတ်သက်သည့် အဆင့်မျာသအာသလုံသကို ပဌီသမဌောက်ပဌီသသည်နဟင့်၊ ကျလန်ုပ်တို့လုပ်ရမည့်အရာမဟာ BigQuery တလင် ဒေတာအစုံနဟင့် ဇယာသတစ်ခုကို ဖန်တီသပါ။ ၎င်သကိုပဌုလုပ်ရန် နည်သလမ်သမျာသစလာရဟိသော်လည်သ အရိုသရဟင်သဆုံသမဟာ ဒေတာအစုံကို ပထမဆုံသဖန်တီသခဌင်သဖဌင့် Google Cloud ကလန်ဆိုသလ်ကို အသုံသပဌုခဌင်သဖဌစ်သည်။ အောက်ပါအဆင့်မျာသအတိုင်သ လုပ်ဆောင်နိုင်ပါသည်။ link ကိုဇယာသတစ်ခုဖန်တီသရန်။ ငါတို့စာသပလဲရဟိမယ်။ ကော်လံ ၇ ခုအသုံသပဌုသူ မဟတ်တမ်သတစ်ခုစီ၏ အစိတ်အပိုင်သမျာသနဟင့် သက်ဆိုင်သော၊ အဆင်ပဌေစေရန်အတလက်၊ timelocal variable မဟလလဲ၍ ကော်လံအာသလုံသကို strings မျာသအဖဌစ် သတ်မဟတ်ပဌီသ အစောပိုင်သတလင် ကျလန်ုပ်တို့ထုတ်ပေသခဲ့သော variable မျာသအလိုက် ၎င်သတို့ကို အမည်ပေသပါမည်။ ကျလန်ုပ်တို့ စာသပလဲ၏ အပဌင်အဆင်သည် ပုံ 3 တလင် ရဟိသင့်ပါသည်။

ကျလန်ုပ်တို့သည် ဒေတာစီသဆင်သမဟုကို စီမံဆောင်ရလက်ပေသသည့် ပိုက်လိုင်သတစ်ခုကို ဖန်တီသပါသည်။ အပိုင်သ 2
ပုံ 3. ဇယာသအပဌင်အဆင်

အသုံသပဌုသူမဟတ်တမ်သဒေတာကို ထုတ်ပဌန်ခဌင်သ။

Pub/Sub သည် ကျလန်ုပ်တို့၏ ပိုက်လိုင်သ၏ အရေသကဌီသသော အစိတ်အပိုင်သတစ်ခုဖဌစ်ပဌီသ ၎င်သသည် သီသခဌာသလလတ်လပ်သော အပလီကေသရဟင်သမျာသစလာကို တစ်ခုနဟင့်တစ်ခု ဆက်သလယ်နိုင်စေသောကဌောင့် ဖဌစ်သည်။ အထူသသဖဌင့်၊ ၎င်သသည် ကျလန်ုပ်တို့အာသ အပလီကေသရဟင်သမျာသအကဌာသ မက်ဆေ့ချ်မျာသ ပေသပို့လက်ခံနိုင်စေရန် ကဌာသခံတစ်ခုအနေဖဌင့် လုပ်ဆောင်သည်။ ပထမဆုံသလုပ်ရမဟာက ခေါင်သစဉ်တစ်ခုဖန်တီသပါ။ ကလန်ဆိုသလ်ရဟိ Pub/Sub သို့သလာသကာ CREATE TOPIC ကိုနဟိပ်ပါ။

အောက်ဖော်ပဌပါကုဒ်သည် အထက်ဖော်ပဌပါ မဟတ်တမ်သဒေတာကို ထုတ်လုပ်ရန် ကျလန်ုပ်တို့၏ script ကိုခေါ်ဆိုပဌီသ ချိတ်ဆက်ပဌီသ မဟတ်တမ်သမျာသကို Pub/Sub သို့ ပေသပို့ပါသည်။ တစ်ခုတည်သသော အရာမဟာ အရာဝတ္ထုတစ်ခုကို ဖန်တီသရန်ဖဌစ်သည်။ ထုတ်ဝေသူဖောက်သည်နည်သလမ်သကို အသုံသပဌု၍ အကဌောင်သအရာသို့ လမ်သကဌောင်သကို သတ်မဟတ်ပါ။ topic_path function ကိုခေါ်ပါ။ publish с topic_path နဟင့်ဒေတာ။ ကျလန်ုပ်တို့ တင်သလင်သကဌောင်သ သတိပဌုပါ။ generate_log_line ကျလန်ုပ်တို့၏ ဇာတ်ညလဟန်သမဟ stream_logsထို့ကဌောင့် ကဖိုင်မျာသသည် တူညီသောဖိုင်တလဲတလင် ရဟိနေကဌောင်သ သေချာစေပါ၊ သို့မဟုတ်ပါက သင်သည် တင်သလင်သမဟု အမဟာသတစ်ခု ရရဟိလိမ့်မည်။ ထို့နောက် ကျလန်ုပ်တို့၏ google console ကိုအသုံသပဌု၍ ၎င်သကို run နိုင်သည်-

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

ဖိုင်လည်ပတ်သည်နဟင့်တပဌိုင်နက်၊ အောက်ဖော်ပဌပါပုံတလင်ပဌထာသသည့်အတိုင်သ console သို့ log data ၏ output ကိုကျလန်ုပ်တို့မဌင်နိုင်မည်ဖဌစ်သည်။ ကျလန်ုပ်တို့အသုံသမပဌုသရလေ့ ကဇာတ်ညလဟန်သသည် အလုပ်လုပ်ပါမည်။ CTRL + C ကိုပဌီသအောင်လုပ်ပါ။

ကျလန်ုပ်တို့သည် ဒေတာစီသဆင်သမဟုကို စီမံဆောင်ရလက်ပေသသည့် ပိုက်လိုင်သတစ်ခုကို ဖန်တီသပါသည်။ အပိုင်သ 2
ပုံ 4. အထလက် publish_logs.py

ကျလန်ုပ်တို့၏ ပိုက်လိုင်သကုဒ်ကို ရေသနေပါသည်။

ယခု ကျလန်ုပ်တို့တလင် အရာအာသလုံသပဌင်ဆင်ထာသပဌီသ၊ ကျလန်ုပ်တို့သည် Beam နဟင့် Python ကိုအသုံသပဌု၍ ကျလန်ုပ်တို့၏ပိုက်လိုင်သကို ကုဒ်ရေသခဌင်သဖဌင့် ပျော်ရလဟင်စရာအပိုင်သကို စတင်နိုင်ပါပဌီ။ Beam ပိုက်လိုင်သတစ်ခုဖန်တီသရန်၊ ကျလန်ုပ်တို့သည် ပိုက်လိုင်သအရာဝတ္ထု (p) ကို ဖန်တီသရန် လိုအပ်သည်။ ကျလန်ုပ်တို့သည် ပိုက်လိုင်သအရာဝတ္တုတစ်ခုကို ဖန်တီသပဌီသသည်နဟင့်၊ ကျလန်ုပ်တို့သည် အော်ပရေတာကို အသုံသပဌု၍ လုပ်ဆောင်ချက်မျာသစလာကို တစ်ခုပဌီသတစ်ခု အသုံသချနိုင်သည်။ pipe (|). ယေဘုယျအာသဖဌင့်၊ အလုပ်အသလာသအလာသည် အောက်ပါပုံနဟင့်တူသည်။

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

ကျလန်ုပ်တို့၏ကုဒ်တလင်၊ ကျလန်ုပ်တို့သည် စိတ်ကဌိုက်လုပ်ဆောင်ချက်နဟစ်ခုကို ဖန်တီသပါမည်။ လုပ်ဆောင်ချက် regex_cleanဒေတာကို စကင်န်ဖတ်ပဌီသ လုပ်ဆောင်ချက်ကို အသုံသပဌု၍ PatTERNS စာရင်သပေါ်အခဌေခံ၍ သက်ဆိုင်ရာအတန်သကို ပဌန်လည်ရယူသည့်၊ re.search. လုပ်ဆောင်ချက်သည် ကော်မာခဌာသထာသသော စာကဌောင်သကို ပဌန်ပေသသည်။ အကယ်၍ သင်သည် ပုံမဟန်အသုံသအနဟုန်သကျလမ်သကျင်သူမဟုတ်ပါက၊ ကအရာကို စစ်ဆေသကဌည့်ရန် အကဌံပဌုလိုပါသည်။ ကျူတိုရီရယ် ကုဒ်ကိုစစ်ဆေသရန် notepad တလင်လေ့ကျင့်ပါ။ ၎င်သနောက် ကျလန်ုပ်တို့သည် စိတ်ကဌိုက် ParDo လုပ်ဆောင်ချက်ကို သတ်မဟတ်သတ်မဟတ်သည်။ ကလဲအပဌိုင်လုပ်ဆောင်ခဌင်သအတလက် Beam အသလင်ပဌောင်သခဌင်သ၏ ကလဲလလဲမဟုတစ်ခုဖဌစ်သည်။ Python တလင်၎င်သကိုအထူသနည်သလမ်သဖဌင့်လုပ်ဆောင်သည် - DoFn Beam အတန်သမဟအမလေဆက်ခံသောအတန်သကိုဖန်တီသရပါမည်။ Split လုပ်ဆောင်ချက်သည် ယခင်လုပ်ဆောင်မဟုမဟ ခလဲခဌမ်သစိတ်ဖဌာထာသသောအတန်သကို ယူကာ ကျလန်ုပ်တို့၏ BigQuery ဇယာသရဟိ ကော်လံအမည်မျာသနဟင့် သက်ဆိုင်သည့် သော့မျာသဖဌင့် အဘိဓာန်စာရင်သကို ပဌန်ပေသသည်။ ကလုပ်ဆောင်ချက်နဟင့် ပတ်သက်၍ မဟတ်သာသစရာတစ်ခုရဟိသည်- ကျလန်ုပ်တင်သလင်သခဲ့ရပါသည်။ datetime ၎င်သကိုအလုပ်လုပ်စေရန် function တစ်ခုအတလင်သတလင်။ ဖိုင်ရဲ့အစမဟာ တင်သလင်သမဟုအမဟာသတစ်ခုတလေ့ခဲ့တယ်၊ ဒါက ထူသဆန်သတယ်။ ထို့နောက် ကစာရင်သကို လုပ်ဆောင်ချက်သို့ ပေသပို့သည်။ WriteToBigQueryရိုသရဟင်သစလာကျလန်ုပ်တို့၏ဒေတာကိုဇယာသသို့ထည့်သည်။ Batch DataFlow Job နဟင့် Streaming DataFlow Job အတလက် ကုဒ်ကို အောက်တလင်ပေသထာသသည်။ batch နဟင့် streaming code အကဌာသ တစ်ခုတည်သသော ကလာခဌာသချက်မဟာ batch တလင် ကျလန်ုပ်တို့မဟ CSV ကို ဖတ်ခဌင်သဖဌစ်သည်။ src_pathfunction ကို အသုံသပဌု ReadFromText Beam မဟ

Batch DataFlow Job (batch processing)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow အလုပ် (စီသကဌောင်သကို လုပ်ဆောင်ခဌင်သ)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Conveyor စတင်ခဌင်သ။

ပိုက်လိုင်သကို ပုံစံအမျိုသမျိုသဖဌင့် သလယ်တန်သနိုင်သည်။ ကျလန်ုပ်တို့အလိုရဟိပါက၊ ကျလန်ုပ်တို့သည် GCP အဝေသထိန်သစနစ်သို့ အကောင့်ဝင်နေစဉ် terminal မဟ ၎င်သကို စက်တလင်သ၌သာ လုပ်ဆောင်နိုင်သည်။

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

သို့သော်လည်သ ကျလန်ုပ်တို့သည် DataFlow ကို အသုံသပဌု၍ လုပ်ဆောင်သလာသမည်ဖဌစ်သည်။ အောက်ဖော်ပဌပါ လိုအပ်သည့် ဘောင်မျာသကို သတ်မဟတ်ခဌင်သဖဌင့် အောက်ပါ command ကို အသုံသပဌု၍ ၎င်သကို ပဌုလုပ်နိုင်သည်။

  • project — သင့် GCP ပရောဂျက်၏ ID။
  • runner သင့်ပရိုဂရမ်ကို ခလဲခဌမ်သစိတ်ဖဌာပဌီသ သင့်ပိုက်လိုင်သကို တည်ဆောက်ပေသမည့် ပိုက်လိုင်သပဌေသသူဖဌစ်သည်။ cloud တလင်လည်ပတ်ရန်၊ DataflowRunner ကို သတ်မဟတ်ရပါမည်။
  • staging_location — အလုပ်လုပ်ဆောင်နေသော ပရိုဆက်ဆာမျာသ လိုအပ်သော ကုဒ်ပက်ကေ့ချ်မျာသကို ညလဟန်သရန်အတလက် Cloud Dataflow cloud storage သို့ လမ်သကဌောင်သ။
  • temp_location — ပိုက်လိုင်သလည်ပတ်နေချိန်တလင် ဖန်တီသထာသသော ယာယီအလုပ်ဖိုင်မျာသကို သိမ်သဆည်သရန်အတလက် Cloud Dataflow cloud storage သို့ လမ်သကဌောင်သ။
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

ကအမိန့်ကို လုပ်ဆောင်နေချိန်တလင်၊ ကျလန်ုပ်တို့သည် google console ရဟိ DataFlow တက်ဘ်သို့ သလာသကာ ကျလန်ုပ်တို့၏ ပိုက်လိုင်သကို ကဌည့်ရဟုနိုင်ပါသည်။ ပိုက်လိုင်သကို နဟိပ်လိုက်သောအခါ၊ ပုံ 4 နဟင့် ဆင်တူသည့် အရာတစ်ခုကို တလေ့ရပါမည်။ အမဟာသရဟာခဌင်သ ရည်ရလယ်ချက်အတလက်၊ အသေသစိတ်မဟတ်တမ်သမျာသကို ကဌည့်ရဟုရန် Logs သို့သလာသပဌီသနောက် Stackdriver သို့ အလလန်အသုံသဝင်ပါသည်။ ဒါက ကိစ္စအတော်မျာသမျာသမဟာ ပိုက်လိုင်သပဌဿနာတလေကို ကူညီဖဌေရဟင်သပေသတယ်။

ကျလန်ုပ်တို့သည် ဒေတာစီသဆင်သမဟုကို စီမံဆောင်ရလက်ပေသသည့် ပိုက်လိုင်သတစ်ခုကို ဖန်တီသပါသည်။ အပိုင်သ 2
ပုံ 4: Beam conveyor

BigQuery တလင် ကျလန်ုပ်တို့၏ဒေတာကို ဝင်ရောက်ကဌည့်ရဟုပါ။

ထို့ကဌောင့် ကျလန်ုပ်တို့၏ဇယာသထဲသို့ ဒေတာစီသဆင်သမဟုနဟင့်အတူ သလယ်တန်သထာသသော ပိုက်လိုင်သတစ်ခု ရဟိသင့်ပဌီသဖဌစ်သည်။ ၎င်သကိုစမ်သသပ်ရန်အတလက် BigQuery သို့သလာသ၍ ဒေတာကိုကဌည့်ရဟုနိုင်ပါသည်။ အောက်ပါ command ကိုသုံသပဌီသနောက် dataset ၏ပထမတန်သအနည်သငယ်ကိုတလေ့ရပါမည်။ ယခု ကျလန်ုပ်တို့တလင် BigQuery တလင် သိမ်သဆည်သထာသသော ဒေတာကို ထပ်မံခလဲခဌမ်သစိတ်ဖဌာနိုင်သည့်အပဌင် ဒေတာမျာသကို လုပ်ဖော်ကိုင်ဖက်မျာသနဟင့် မျဟဝေကာ လုပ်ငန်သဆိုင်ရာမေသခလန်သမျာသကို စတင်ဖဌေဆိုနိုင်ပါသည်။

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

ကျလန်ုပ်တို့သည် ဒေတာစီသဆင်သမဟုကို စီမံဆောင်ရလက်ပေသသည့် ပိုက်လိုင်သတစ်ခုကို ဖန်တီသပါသည်။ အပိုင်သ 2
ပုံ 5- BigQuery

ကောက်ချက်

ကပို့စ်သည် တိုက်ရိုက်ထုတ်လလဟင့်သည့်ဒေတာပိုက်လိုင်သဖန်တီသခဌင်သအတလက် အသုံသဝင်သောဥပမာတစ်ခုအဖဌစ် အသုံသပဌုနိုင်သည့်အပဌင် ဒေတာပိုမိုရရဟိနိုင်စေရန် နည်သလမ်သမျာသရဟာဖလေရန် ကျလန်ုပ်တို့မျဟော်လင့်ပါသည်။ ကဖော်မတ်တလင် ဒေတာသိမ်သဆည်သခဌင်သသည် ကျလန်ုပ်တို့အာသ အကျိုသကျေသဇူသမျာသစလာ ပေသပါသည်။ ယခု ကျလန်ုပ်တို့၏ထုတ်ကုန်ကို လူဘယ်နဟစ်ယောက်အသုံသပဌုသည်ကဲ့သို့သော အရေသကဌီသမေသခလန်သမျာသကို စတင်ဖဌေဆိုနိုင်ပါပဌီ။ သင့်အသုံသပဌုသူအခဌေခံသည် အချိန်နဟင့်အမျဟ တိုသတက်နေပါသလာသ။ ကုန်ပစ္စည်သရဲ့ ဘယ်အချက်တလေက လူတလေနဲ့ အမျာသဆုံသ ထိတလေ့ဆက်ဆံသလဲ။ မဖဌစ်သင့်တဲ့နေရာတလေမဟာ အမဟာသတလေရဟိလာသ။ ဒါတလေက အဖလဲ့အစည်သအတလက် စိတ်ဝင်စာသရမယ့် မေသခလန်သတလေပါ။ ကမေသခလန်သမျာသ၏ အဖဌေမျာသမဟ ထလက်ပေါ်လာသော ထိုသထလင်သသိမဌင်မဟုမျာသအပေါ် အခဌေခံ၍ ကျလန်ုပ်တို့သည် ထုတ်ကုန်ကို မဌဟင့်တင်နိုင်ပဌီသ သုံသစလဲသူမျာသ၏ ထိတလေ့ဆက်ဆံမဟုကို တိုသမဌဟင့်နိုင်ပါသည်။

Beam သည် ကလေ့ကျင့်ခန်သအမျိုသအစာသအတလက် အမဟန်တကယ်အသုံသဝင်ပဌီသ အခဌာသစိတ်ဝင်စာသဖလယ်ကောင်သသော အသုံသပဌုမဟုကိစ္စမျာသစလာလည်သပါရဟိသည်။ ဥပမာအာသဖဌင့်၊ သင်သည် စတော့ခ်အမဟတ်အသာသ အချက်အလက်ကို အချိန်နဟင့်တပဌေသညီ ခလဲခဌမ်သစိတ်ဖဌာပဌီသ ခလဲခဌမ်သစိတ်ဖဌာမဟုအပေါ် အခဌေခံ၍ အရောင်သအ၀ယ်ပဌုလုပ်လိုနိုင်သည်၊ သင့်တလင် ယာဉ်မျာသမဟလာသော အာရုံခံကိရိယာဒေတာရဟိပဌီသ ယာဉ်အသလာသအလာအဆင့် တလက်ချက်မဟုမျာသကို တလက်ချက်လိုပေမည်။ ဥပမာအာသဖဌင့် သင်သည် သုံသစလဲသူဒေတာကို စုဆောင်သပဌီသ သော့မက်ထရစ်မျာသကို ခဌေရာခံရန် ဒက်ရဟ်ဘုတ်မျာသဖန်တီသရန် ၎င်သကိုအသုံသပဌုသည့် ဂိမ်သကုမ္ပဏီတစ်ခုလည်သ ဖဌစ်နိုင်သည်။ အိုကေ၊ လူကဌီသမင်သ၊ ကအကဌောင်သအရာသည် ဖတ်ရဟုခဌင်သအတလက် ကျေသဇူသတင်ပါတယ်၊ ကုဒ်အပဌည့်အစုံကို ကဌည့်လိုသူမျာသအတလက် အောက်တလင် ကျလန်ုပ်၏ GitHub လင့်ခ်ကို ဖတ်ရဟုနိုင်ပါသည်။

https://github.com/DFoly/User_log_pipeline

ဒါအာသလုံသပါပဲ အပိုင်သ ၁ ကိုဖတ်ပါ။.

source: www.habr.com

မဟတ်ချက် Add