ಎಲ್ಲರಿಗು ನಮಸ್ಖರ. ನಾವು ಲೇಖನದ ಅಂತಿಮ ಭಾಗದ ಅನುವಾದವನ್ನು ಹಂಚಿಕೊಳ್ಳುತ್ತಿದ್ದೇವೆ, ನಿರ್ದಿಷ್ಟವಾಗಿ ಕೋರ್ಸ್ನ ವಿದ್ಯಾರ್ಥಿಗಳಿಗೆ ಸಿದ್ಧಪಡಿಸಲಾಗಿದೆ.
ರಿಯಲ್-ಟೈಮ್ ಪೈಪ್ಲೈನ್ಗಳಿಗಾಗಿ ಅಪಾಚೆ ಬೀಮ್ ಮತ್ತು ಡೇಟಾಫ್ಲೋ
Google ಮೇಘವನ್ನು ಹೊಂದಿಸಲಾಗುತ್ತಿದೆ
ಗಮನಿಸಿ: ಪೈಥಾನ್ 3 ರಲ್ಲಿ ಪೈಪ್ಲೈನ್ ಅನ್ನು ಚಾಲನೆ ಮಾಡುವಲ್ಲಿ ನನಗೆ ತೊಂದರೆಯಿರುವ ಕಾರಣ ಪೈಪ್ಲೈನ್ ಅನ್ನು ರನ್ ಮಾಡಲು ಮತ್ತು ಕಸ್ಟಮ್ ಲಾಗ್ ಡೇಟಾವನ್ನು ಪ್ರಕಟಿಸಲು ನಾನು Google ಕ್ಲೌಡ್ ಶೆಲ್ ಅನ್ನು ಬಳಸಿದ್ದೇನೆ. ಗೂಗಲ್ ಕ್ಲೌಡ್ ಶೆಲ್ ಪೈಥಾನ್ 2 ಅನ್ನು ಬಳಸುತ್ತದೆ, ಇದು ಅಪಾಚೆ ಬೀಮ್ನೊಂದಿಗೆ ಹೆಚ್ಚು ಸ್ಥಿರವಾಗಿರುತ್ತದೆ.
ಪೈಪ್ಲೈನ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಲು, ನಾವು ಸೆಟ್ಟಿಂಗ್ಗಳಲ್ಲಿ ಸ್ವಲ್ಪ ಡಿಗ್ ಮಾಡಬೇಕಾಗಿದೆ. ನಿಮ್ಮಲ್ಲಿ ಮೊದಲು GCP ಅನ್ನು ಬಳಸದೆ ಇರುವವರಿಗೆ, ಇದರಲ್ಲಿ ವಿವರಿಸಿರುವ ಕೆಳಗಿನ 6 ಹಂತಗಳನ್ನು ನೀವು ಅನುಸರಿಸಬೇಕಾಗುತ್ತದೆ
ಇದರ ನಂತರ, ನಾವು ನಮ್ಮ ಸ್ಕ್ರಿಪ್ಟ್ಗಳನ್ನು Google ಮೇಘ ಸಂಗ್ರಹಣೆಗೆ ಅಪ್ಲೋಡ್ ಮಾಡಬೇಕಾಗುತ್ತದೆ ಮತ್ತು ಅವುಗಳನ್ನು ನಮ್ಮ Google ಕ್ಲೌಡ್ ಶೆಲ್ಗೆ ನಕಲಿಸಬೇಕಾಗುತ್ತದೆ. ಕ್ಲೌಡ್ ಸಂಗ್ರಹಣೆಗೆ ಅಪ್ಲೋಡ್ ಮಾಡುವುದು ತುಂಬಾ ಕ್ಷುಲ್ಲಕವಾಗಿದೆ (ವಿವರಣೆಯನ್ನು ಕಾಣಬಹುದು
ಚಿತ್ರ 2
ನಾವು ಫೈಲ್ಗಳನ್ನು ನಕಲಿಸಲು ಮತ್ತು ಅಗತ್ಯವಿರುವ ಲೈಬ್ರರಿಗಳನ್ನು ಸ್ಥಾಪಿಸಲು ಅಗತ್ಯವಿರುವ ಆಜ್ಞೆಗಳನ್ನು ಕೆಳಗೆ ಪಟ್ಟಿ ಮಾಡಲಾಗಿದೆ.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
ನಮ್ಮ ಡೇಟಾಬೇಸ್ ಮತ್ತು ಟೇಬಲ್ ಅನ್ನು ರಚಿಸುವುದು
ಒಮ್ಮೆ ನಾವು ಎಲ್ಲಾ ಸೆಟಪ್ ಸಂಬಂಧಿತ ಹಂತಗಳನ್ನು ಪೂರ್ಣಗೊಳಿಸಿದ ನಂತರ, ನಾವು ಮಾಡಬೇಕಾದ ಮುಂದಿನ ಕೆಲಸವೆಂದರೆ BigQuery ನಲ್ಲಿ ಡೇಟಾಸೆಟ್ ಮತ್ತು ಟೇಬಲ್ ಅನ್ನು ರಚಿಸುವುದು. ಇದನ್ನು ಮಾಡಲು ಹಲವಾರು ಮಾರ್ಗಗಳಿವೆ, ಆದರೆ ಮೊದಲು ಡೇಟಾಸೆಟ್ ಅನ್ನು ರಚಿಸುವ ಮೂಲಕ Google ಕ್ಲೌಡ್ ಕನ್ಸೋಲ್ ಅನ್ನು ಬಳಸುವುದು ಸರಳವಾಗಿದೆ. ನೀವು ಕೆಳಗಿನ ಹಂತಗಳನ್ನು ಅನುಸರಿಸಬಹುದು
ಚಿತ್ರ 3. ಟೇಬಲ್ ಲೇಔಟ್
ಬಳಕೆದಾರರ ಲಾಗ್ ಡೇಟಾವನ್ನು ಪ್ರಕಟಿಸಲಾಗುತ್ತಿದೆ
ಪಬ್/ಸಬ್ ನಮ್ಮ ಪೈಪ್ಲೈನ್ನ ನಿರ್ಣಾಯಕ ಅಂಶವಾಗಿದೆ ಏಕೆಂದರೆ ಇದು ಬಹು ಸ್ವತಂತ್ರ ಅಪ್ಲಿಕೇಶನ್ಗಳನ್ನು ಪರಸ್ಪರ ಸಂವಹನ ಮಾಡಲು ಅನುಮತಿಸುತ್ತದೆ. ನಿರ್ದಿಷ್ಟವಾಗಿ ಹೇಳುವುದಾದರೆ, ಅಪ್ಲಿಕೇಶನ್ಗಳ ನಡುವೆ ಸಂದೇಶಗಳನ್ನು ಕಳುಹಿಸಲು ಮತ್ತು ಸ್ವೀಕರಿಸಲು ನಮಗೆ ಅನುಮತಿಸುವ ಮಧ್ಯವರ್ತಿಯಾಗಿ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ. ನಾವು ಮಾಡಬೇಕಾದ ಮೊದಲ ವಿಷಯವೆಂದರೆ ವಿಷಯವನ್ನು ರಚಿಸುವುದು. ಕನ್ಸೋಲ್ನಲ್ಲಿ ಪಬ್/ಸಬ್ಗೆ ಹೋಗಿ ಮತ್ತು ವಿಷಯವನ್ನು ರಚಿಸಿ ಕ್ಲಿಕ್ ಮಾಡಿ.
ಕೆಳಗಿನ ಕೋಡ್ ಮೇಲೆ ವಿವರಿಸಿದ ಲಾಗ್ ಡೇಟಾವನ್ನು ರಚಿಸಲು ನಮ್ಮ ಸ್ಕ್ರಿಪ್ಟ್ ಅನ್ನು ಕರೆಯುತ್ತದೆ ಮತ್ತು ನಂತರ ಲಾಗ್ಗಳನ್ನು ಪಬ್/ಸಬ್ಗೆ ಸಂಪರ್ಕಿಸುತ್ತದೆ ಮತ್ತು ಕಳುಹಿಸುತ್ತದೆ. ನಾವು ಮಾಡಬೇಕಾದ ಏಕೈಕ ವಿಷಯವೆಂದರೆ ವಸ್ತುವನ್ನು ರಚಿಸುವುದು ಪ್ರಕಾಶಕ ಗ್ರಾಹಕ, ವಿಧಾನವನ್ನು ಬಳಸಿಕೊಂಡು ವಿಷಯದ ಮಾರ್ಗವನ್ನು ಸೂಚಿಸಿ topic_path
ಮತ್ತು ಕಾರ್ಯವನ್ನು ಕರೆ ಮಾಡಿ publish
с topic_path
ಮತ್ತು ಡೇಟಾ. ನಾವು ಆಮದು ಮಾಡಿಕೊಳ್ಳುತ್ತೇವೆ ಎಂಬುದನ್ನು ದಯವಿಟ್ಟು ಗಮನಿಸಿ generate_log_line
ನಮ್ಮ ಲಿಪಿಯಿಂದ stream_logs
, ಆದ್ದರಿಂದ ಈ ಫೈಲ್ಗಳು ಒಂದೇ ಫೋಲ್ಡರ್ನಲ್ಲಿವೆ ಎಂದು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳಿ, ಇಲ್ಲದಿದ್ದರೆ ನೀವು ಆಮದು ದೋಷವನ್ನು ಪಡೆಯುತ್ತೀರಿ. ನಂತರ ನಾವು ಇದನ್ನು ನಮ್ಮ Google ಕನ್ಸೋಲ್ ಮೂಲಕ ಚಲಾಯಿಸಬಹುದು:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
ಫೈಲ್ ರನ್ ಆದ ತಕ್ಷಣ, ಕೆಳಗಿನ ಚಿತ್ರದಲ್ಲಿ ತೋರಿಸಿರುವಂತೆ ಕನ್ಸೋಲ್ಗೆ ಲಾಗ್ ಡೇಟಾದ ಔಟ್ಪುಟ್ ಅನ್ನು ನೋಡಲು ನಮಗೆ ಸಾಧ್ಯವಾಗುತ್ತದೆ. ನಾವು ಬಳಸದಿರುವವರೆಗೆ ಈ ಸ್ಕ್ರಿಪ್ಟ್ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ CTRL + Cಅದನ್ನು ಪೂರ್ಣಗೊಳಿಸಲು.
ಚಿತ್ರ 4. ಔಟ್ಪುಟ್ publish_logs.py
ನಮ್ಮ ಪೈಪ್ಲೈನ್ ಕೋಡ್ ಬರೆಯುವುದು
ಈಗ ನಾವು ಎಲ್ಲವನ್ನೂ ಸಿದ್ಧಪಡಿಸಿದ್ದೇವೆ, ನಾವು ಮೋಜಿನ ಭಾಗವನ್ನು ಪ್ರಾರಂಭಿಸಬಹುದು - ಬೀಮ್ ಮತ್ತು ಪೈಥಾನ್ ಬಳಸಿ ನಮ್ಮ ಪೈಪ್ಲೈನ್ ಕೋಡಿಂಗ್. ಬೀಮ್ ಪೈಪ್ಲೈನ್ ರಚಿಸಲು, ನಾವು ಪೈಪ್ಲೈನ್ ವಸ್ತುವನ್ನು (p) ರಚಿಸಬೇಕಾಗಿದೆ. ಒಮ್ಮೆ ನಾವು ಪೈಪ್ಲೈನ್ ಆಬ್ಜೆಕ್ಟ್ ಅನ್ನು ರಚಿಸಿದ ನಂತರ, ಆಪರೇಟರ್ ಅನ್ನು ಬಳಸಿಕೊಂಡು ನಾವು ಒಂದರ ನಂತರ ಒಂದರಂತೆ ಅನೇಕ ಕಾರ್ಯಗಳನ್ನು ಅನ್ವಯಿಸಬಹುದು pipe (|)
. ಸಾಮಾನ್ಯವಾಗಿ, ಕೆಲಸದ ಹರಿವು ಕೆಳಗಿನ ಚಿತ್ರದಂತೆ ಕಾಣುತ್ತದೆ.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
ನಮ್ಮ ಕೋಡ್ನಲ್ಲಿ, ನಾವು ಎರಡು ಕಸ್ಟಮ್ ಕಾರ್ಯಗಳನ್ನು ರಚಿಸುತ್ತೇವೆ. ಕಾರ್ಯ regex_clean
, ಇದು ಡೇಟಾವನ್ನು ಸ್ಕ್ಯಾನ್ ಮಾಡುತ್ತದೆ ಮತ್ತು ಕಾರ್ಯವನ್ನು ಬಳಸಿಕೊಂಡು ಪ್ಯಾಟರ್ನ್ಸ್ ಪಟ್ಟಿಯನ್ನು ಆಧರಿಸಿ ಅನುಗುಣವಾದ ಸಾಲನ್ನು ಹಿಂಪಡೆಯುತ್ತದೆ re.search
. ಕಾರ್ಯವು ಅಲ್ಪವಿರಾಮದಿಂದ ಬೇರ್ಪಟ್ಟ ಸ್ಟ್ರಿಂಗ್ ಅನ್ನು ಹಿಂತಿರುಗಿಸುತ್ತದೆ. ನೀವು ನಿಯಮಿತ ಅಭಿವ್ಯಕ್ತಿ ತಜ್ಞರಲ್ಲದಿದ್ದರೆ, ಇದನ್ನು ಪರಿಶೀಲಿಸಲು ನಾನು ಶಿಫಾರಸು ಮಾಡುತ್ತೇವೆ datetime
ಇದು ಕೆಲಸ ಮಾಡಲು ಒಂದು ಕಾರ್ಯದ ಒಳಗೆ. ಫೈಲ್ನ ಪ್ರಾರಂಭದಲ್ಲಿ ನಾನು ಆಮದು ದೋಷವನ್ನು ಪಡೆಯುತ್ತಿದ್ದೆ, ಅದು ವಿಚಿತ್ರವಾಗಿದೆ. ನಂತರ ಈ ಪಟ್ಟಿಯನ್ನು ಕಾರ್ಯಕ್ಕೆ ರವಾನಿಸಲಾಗುತ್ತದೆ WriteToBigQuery, ಇದು ಸರಳವಾಗಿ ನಮ್ಮ ಡೇಟಾವನ್ನು ಟೇಬಲ್ಗೆ ಸೇರಿಸುತ್ತದೆ. ಬ್ಯಾಚ್ ಡೇಟಾಫ್ಲೋ ಜಾಬ್ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾಫ್ಲೋ ಜಾಬ್ಗಾಗಿ ಕೋಡ್ ಅನ್ನು ಕೆಳಗೆ ನೀಡಲಾಗಿದೆ. ಬ್ಯಾಚ್ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಕೋಡ್ ನಡುವಿನ ವ್ಯತ್ಯಾಸವೆಂದರೆ ಬ್ಯಾಚ್ನಲ್ಲಿ ನಾವು CSV ಅನ್ನು ಓದುತ್ತೇವೆ src_path
ಕಾರ್ಯವನ್ನು ಬಳಸುವುದು ReadFromText
ಕಿರಣದಿಂದ.
ಬ್ಯಾಚ್ ಡೇಟಾಫ್ಲೋ ಜಾಬ್ (ಬ್ಯಾಚ್ ಪ್ರಕ್ರಿಯೆ)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾಫ್ಲೋ ಜಾಬ್ (ಸ್ಟ್ರೀಮ್ ಪ್ರಕ್ರಿಯೆ)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
ಕನ್ವೇಯರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಲಾಗುತ್ತಿದೆ
ನಾವು ಪೈಪ್ಲೈನ್ ಅನ್ನು ವಿವಿಧ ರೀತಿಯಲ್ಲಿ ಚಲಾಯಿಸಬಹುದು. ನಾವು ಬಯಸಿದರೆ, ರಿಮೋಟ್ ಆಗಿ GCP ಗೆ ಲಾಗ್ ಇನ್ ಮಾಡುವಾಗ ನಾವು ಅದನ್ನು ಟರ್ಮಿನಲ್ನಿಂದ ಸ್ಥಳೀಯವಾಗಿ ಚಲಾಯಿಸಬಹುದು.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
ಆದಾಗ್ಯೂ, ನಾವು ಅದನ್ನು DataFlow ಬಳಸಿ ಚಲಾಯಿಸಲಿದ್ದೇವೆ. ಕೆಳಗಿನ ಅಗತ್ಯವಿರುವ ನಿಯತಾಂಕಗಳನ್ನು ಹೊಂದಿಸುವ ಮೂಲಕ ಕೆಳಗಿನ ಆಜ್ಞೆಯನ್ನು ಬಳಸಿಕೊಂಡು ನಾವು ಇದನ್ನು ಮಾಡಬಹುದು.
project
— ನಿಮ್ಮ GCP ಯೋಜನೆಯ ID.runner
ಪೈಪ್ಲೈನ್ ರನ್ನರ್ ಆಗಿದ್ದು ಅದು ನಿಮ್ಮ ಪ್ರೋಗ್ರಾಂ ಅನ್ನು ವಿಶ್ಲೇಷಿಸುತ್ತದೆ ಮತ್ತು ನಿಮ್ಮ ಪೈಪ್ಲೈನ್ ಅನ್ನು ನಿರ್ಮಿಸುತ್ತದೆ. ಕ್ಲೌಡ್ನಲ್ಲಿ ರನ್ ಮಾಡಲು, ನೀವು DataflowRunner ಅನ್ನು ನಿರ್ದಿಷ್ಟಪಡಿಸಬೇಕು.staging_location
— ಕೆಲಸವನ್ನು ನಿರ್ವಹಿಸುವ ಪ್ರೊಸೆಸರ್ಗಳಿಗೆ ಅಗತ್ಯವಿರುವ ಕೋಡ್ ಪ್ಯಾಕೇಜ್ಗಳನ್ನು ಸೂಚಿಕೆ ಮಾಡಲು ಕ್ಲೌಡ್ ಡೇಟಾಫ್ಲೋ ಕ್ಲೌಡ್ ಸಂಗ್ರಹಣೆಯ ಮಾರ್ಗ.temp_location
— ಪೈಪ್ಲೈನ್ ಚಾಲನೆಯಲ್ಲಿರುವಾಗ ರಚಿಸಲಾದ ತಾತ್ಕಾಲಿಕ ಉದ್ಯೋಗ ಫೈಲ್ಗಳನ್ನು ಸಂಗ್ರಹಿಸಲು ಕ್ಲೌಡ್ ಡೇಟಾಫ್ಲೋ ಕ್ಲೌಡ್ ಸಂಗ್ರಹಣೆಗೆ ಮಾರ್ಗ.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
ಈ ಆಜ್ಞೆಯು ಚಾಲನೆಯಲ್ಲಿರುವಾಗ, ನಾವು google console ನಲ್ಲಿ DataFlow ಟ್ಯಾಬ್ಗೆ ಹೋಗಬಹುದು ಮತ್ತು ನಮ್ಮ ಪೈಪ್ಲೈನ್ ಅನ್ನು ವೀಕ್ಷಿಸಬಹುದು. ನಾವು ಪೈಪ್ಲೈನ್ನಲ್ಲಿ ಕ್ಲಿಕ್ ಮಾಡಿದಾಗ, ಚಿತ್ರ 4 ರಂತೆಯೇ ನಾವು ಏನನ್ನಾದರೂ ನೋಡಬೇಕು. ಡೀಬಗ್ ಮಾಡುವ ಉದ್ದೇಶಗಳಿಗಾಗಿ, ವಿವರವಾದ ಲಾಗ್ಗಳನ್ನು ವೀಕ್ಷಿಸಲು ಲಾಗ್ಗಳಿಗೆ ಮತ್ತು ನಂತರ ಸ್ಟಾಕ್ಡ್ರೈವರ್ಗೆ ಹೋಗಲು ಇದು ತುಂಬಾ ಸಹಾಯಕವಾಗಬಹುದು. ಇದು ಹಲವಾರು ಸಂದರ್ಭಗಳಲ್ಲಿ ಪೈಪ್ಲೈನ್ ಸಮಸ್ಯೆಗಳನ್ನು ಪರಿಹರಿಸಲು ನನಗೆ ಸಹಾಯ ಮಾಡಿದೆ.
ಚಿತ್ರ 4: ಬೀಮ್ ಕನ್ವೇಯರ್
BigQuery ನಲ್ಲಿ ನಮ್ಮ ಡೇಟಾವನ್ನು ಪ್ರವೇಶಿಸಿ
ಆದ್ದರಿಂದ, ನಾವು ಈಗಾಗಲೇ ನಮ್ಮ ಟೇಬಲ್ಗೆ ಡೇಟಾ ಹರಿಯುವ ಪೈಪ್ಲೈನ್ ಅನ್ನು ಹೊಂದಿರಬೇಕು. ಇದನ್ನು ಪರೀಕ್ಷಿಸಲು, ನಾವು BigQuery ಗೆ ಹೋಗಬಹುದು ಮತ್ತು ಡೇಟಾವನ್ನು ನೋಡಬಹುದು. ಕೆಳಗಿನ ಆಜ್ಞೆಯನ್ನು ಬಳಸಿದ ನಂತರ ನೀವು ಡೇಟಾಸೆಟ್ನ ಮೊದಲ ಕೆಲವು ಸಾಲುಗಳನ್ನು ನೋಡಬೇಕು. ಈಗ ನಾವು BigQuery ನಲ್ಲಿ ಡೇಟಾವನ್ನು ಸಂಗ್ರಹಿಸಿದ್ದೇವೆ, ನಾವು ಹೆಚ್ಚಿನ ವಿಶ್ಲೇಷಣೆಯನ್ನು ನಡೆಸಬಹುದು, ಹಾಗೆಯೇ ಡೇಟಾವನ್ನು ಸಹೋದ್ಯೋಗಿಗಳೊಂದಿಗೆ ಹಂಚಿಕೊಳ್ಳಬಹುದು ಮತ್ತು ವ್ಯಾಪಾರ ಪ್ರಶ್ನೆಗಳಿಗೆ ಉತ್ತರಿಸಲು ಪ್ರಾರಂಭಿಸಬಹುದು.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
ಚಿತ್ರ 5: BigQuery
ತೀರ್ಮಾನಕ್ಕೆ
ಈ ಪೋಸ್ಟ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಪೈಪ್ಲೈನ್ ಅನ್ನು ರಚಿಸುವ ಉಪಯುಕ್ತ ಉದಾಹರಣೆಯಾಗಿ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ ಎಂದು ನಾವು ಭಾವಿಸುತ್ತೇವೆ, ಜೊತೆಗೆ ಡೇಟಾವನ್ನು ಹೆಚ್ಚು ಪ್ರವೇಶಿಸಲು ಮಾರ್ಗಗಳನ್ನು ಹುಡುಕುತ್ತೇವೆ. ಈ ಸ್ವರೂಪದಲ್ಲಿ ಡೇಟಾವನ್ನು ಸಂಗ್ರಹಿಸುವುದು ನಮಗೆ ಅನೇಕ ಪ್ರಯೋಜನಗಳನ್ನು ನೀಡುತ್ತದೆ. ನಮ್ಮ ಉತ್ಪನ್ನವನ್ನು ಎಷ್ಟು ಜನರು ಬಳಸುತ್ತಾರೆ ಎಂಬಂತಹ ಪ್ರಮುಖ ಪ್ರಶ್ನೆಗಳಿಗೆ ನಾವು ಈಗ ಉತ್ತರಿಸಲು ಪ್ರಾರಂಭಿಸಬಹುದು? ನಿಮ್ಮ ಬಳಕೆದಾರ ನೆಲೆಯು ಕಾಲಾನಂತರದಲ್ಲಿ ಬೆಳೆಯುತ್ತಿದೆಯೇ? ಉತ್ಪನ್ನದ ಯಾವ ಅಂಶಗಳೊಂದಿಗೆ ಜನರು ಹೆಚ್ಚು ಸಂವಹನ ನಡೆಸುತ್ತಾರೆ? ಮತ್ತು ಇರಬಾರದ ಸ್ಥಳದಲ್ಲಿ ದೋಷಗಳಿವೆಯೇ? ಈ ಪ್ರಶ್ನೆಗಳು ಸಂಸ್ಥೆಗೆ ಆಸಕ್ತಿಯನ್ನುಂಟುಮಾಡುತ್ತವೆ. ಈ ಪ್ರಶ್ನೆಗಳಿಗೆ ಉತ್ತರಗಳಿಂದ ಹೊರಹೊಮ್ಮುವ ಒಳನೋಟಗಳ ಆಧಾರದ ಮೇಲೆ, ನಾವು ಉತ್ಪನ್ನವನ್ನು ಸುಧಾರಿಸಬಹುದು ಮತ್ತು ಬಳಕೆದಾರರ ತೊಡಗಿಸಿಕೊಳ್ಳುವಿಕೆಯನ್ನು ಹೆಚ್ಚಿಸಬಹುದು.
ಬೀಮ್ ಈ ರೀತಿಯ ವ್ಯಾಯಾಮಕ್ಕೆ ನಿಜವಾಗಿಯೂ ಉಪಯುಕ್ತವಾಗಿದೆ ಮತ್ತು ಹಲವಾರು ಇತರ ಆಸಕ್ತಿದಾಯಕ ಬಳಕೆಯ ಸಂದರ್ಭಗಳನ್ನು ಹೊಂದಿದೆ. ಉದಾಹರಣೆಗೆ, ನೀವು ನೈಜ ಸಮಯದಲ್ಲಿ ಸ್ಟಾಕ್ ಟಿಕ್ ಡೇಟಾವನ್ನು ವಿಶ್ಲೇಷಿಸಲು ಮತ್ತು ವಿಶ್ಲೇಷಣೆಯ ಆಧಾರದ ಮೇಲೆ ವಹಿವಾಟುಗಳನ್ನು ಮಾಡಲು ಬಯಸಬಹುದು, ಬಹುಶಃ ನೀವು ವಾಹನಗಳಿಂದ ಬರುವ ಸಂವೇದಕ ಡೇಟಾವನ್ನು ಹೊಂದಿದ್ದೀರಿ ಮತ್ತು ಟ್ರಾಫಿಕ್ ಮಟ್ಟದ ಲೆಕ್ಕಾಚಾರಗಳನ್ನು ಲೆಕ್ಕಾಚಾರ ಮಾಡಲು ಬಯಸುತ್ತೀರಿ. ಉದಾಹರಣೆಗೆ, ನೀವು ಬಳಕೆದಾರರ ಡೇಟಾವನ್ನು ಸಂಗ್ರಹಿಸುವ ಮತ್ತು ಪ್ರಮುಖ ಮೆಟ್ರಿಕ್ಗಳನ್ನು ಟ್ರ್ಯಾಕ್ ಮಾಡಲು ಡ್ಯಾಶ್ಬೋರ್ಡ್ಗಳನ್ನು ರಚಿಸಲು ಅದನ್ನು ಬಳಸುವ ಗೇಮಿಂಗ್ ಕಂಪನಿಯಾಗಿರಬಹುದು. ಸರಿ, ಮಹನೀಯರೇ, ಇದು ಮತ್ತೊಂದು ಪೋಸ್ಟ್ಗೆ ವಿಷಯವಾಗಿದೆ, ಓದಿದ್ದಕ್ಕಾಗಿ ಧನ್ಯವಾದಗಳು, ಮತ್ತು ಸಂಪೂರ್ಣ ಕೋಡ್ ಅನ್ನು ನೋಡಲು ಬಯಸುವವರಿಗೆ, ನನ್ನ GitHub ಗೆ ಲಿಂಕ್ ಕೆಳಗೆ ಇದೆ.
ಅದು ಅಷ್ಟೆ.
ಮೂಲ: www.habr.com