ආයුබෝවන් සියල්ලටම. අපි පාඨමාලාවේ සිසුන් සඳහා විශේෂයෙන් සකස් කරන ලද ලිපියේ අවසාන කොටසෙහි පරිවර්තනය බෙදා ගනිමු.
තත්ය කාලීන නල මාර්ග සඳහා Apache Beam සහ DataFlow
Google Cloud පිහිටුවීම
සටහන: මම Python 3 හි නල මාර්ගය ධාවනය කිරීමේදී ගැටළු ඇති නිසා නල මාර්ගය ධාවනය කිරීමට සහ අභිරුචි ලොග් දත්ත ප්රකාශ කිරීමට Google Cloud Shell භාවිතා කළෙමි. Google Cloud Shell Apache Beam සමඟ වඩාත් අනුකූල වන Python 2 භාවිතා කරයි.
නල මාර්ගය ආරම්භ කිරීම සඳහා, අපි සැකසුම් වලට ටිකක් හාරා ගත යුතුය. ඔබ මින් පෙර GCP භාවිතා නොකළ අය සඳහා, ඔබට මෙහි දක්වා ඇති පහත පියවර 6 අනුගමනය කිරීමට අවශ්ය වනු ඇත
මෙයින් පසු, අපට අපගේ ස්ක්රිප්ට් Google Cloud Storage වෙත උඩුගත කර ඒවා අපගේ Google Cloud Shel වෙත පිටපත් කිරීමට අවශ්ය වනු ඇත. වලාකුළු ආචයනයට උඩුගත කිරීම ඉතා සුළු දෙයකි (විස්තරයක් සොයාගත හැකිය
2 රූපය
ගොනු පිටපත් කර අවශ්ය පුස්තකාල ස්ථාපනය කිරීමට අපට අවශ්ය විධාන පහත දැක්වේ.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
අපගේ දත්ත සමුදාය සහ වගුව නිර්මාණය කිරීම
අපි සියලු පිහිටුවීම් සම්බන්ධ පියවර සම්පූර්ණ කළ පසු, අපට කළ යුතු ඊළඟ දෙය වන්නේ BigQuery හි දත්ත කට්ටලයක් සහ වගුවක් නිර්මාණය කිරීමයි. මෙය කිරීමට ක්රම කිහිපයක් ඇත, නමුත් සරලම දෙය නම් දත්ත කට්ටලයක් සෑදීමෙන් Google Cloud කොන්සෝලය භාවිතා කිරීමයි. ඔබට පහත පියවර අනුගමනය කළ හැක
රූපය 3. වගු සැකැස්ම
පරිශීලක ලොග් දත්ත ප්රකාශයට පත් කිරීම
Pub/Sub යනු අපගේ නල මාර්ගයේ තීරණාත්මක අංගයකි, මන්ද එය බහු ස්වාධීන යෙදුම් එකිනෙකා සමඟ සන්නිවේදනය කිරීමට ඉඩ සලසයි. විශේෂයෙන්, එය යෙදුම් අතර පණිවිඩ යැවීමට සහ ලැබීමට ඉඩ සලසන අතරමැදියෙකු ලෙස ක්රියා කරයි. අපි කළ යුතු පළමු දෙය නම් මාතෘකාවක් නිර්මාණය කිරීමයි. කොන්සෝලයේ Pub/Sub වෙත ගොස් CREATE TOPIC ක්ලික් කරන්න.
පහත කේතය ඉහත නිර්වචනය කර ඇති ලොග් දත්ත උත්පාදනය කිරීමට අපගේ ස්ක්රිප්ටය අමතා පසුව සම්බන්ධ කර ලොග Pub/Sub වෙත යවයි. අපි කළ යුතු එකම දෙය වස්තුවක් නිර්මාණය කිරීමයි PublisherClient, ක්රමය භාවිතා කරමින් මාතෘකාවට මාර්ගය සඳහන් කරන්න topic_path
සහ කාර්යය අමතන්න publish
с topic_path
සහ දත්ත. අපි ආනයනය කරන බව කරුණාවෙන් සලකන්න generate_log_line
අපේ පිටපතෙන් stream_logs
, එබැවින් මෙම ගොනු එකම ෆෝල්ඩරයේ ඇති බවට වග බලා ගන්න, එසේ නොමැතිනම් ඔබට ආනයන දෝෂයක් ලැබෙනු ඇත. එවිට අපට මෙය අපගේ ගූගල් කොන්සෝලය හරහා ධාවනය කළ හැක:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
ගොනුව ක්රියාත්මක වූ වහාම, පහත රූපයේ දැක්වෙන පරිදි, කොන්සෝලය වෙත ලොග් දත්ත ප්රතිදානය අපට දැකගත හැකි වනු ඇත. අපි භාවිතා නොකරන තාක් කල් මෙම ස්ක්රිප්ට් එක ක්රියා කරයි CTRL + Cඑය සම්පූර්ණ කිරීමට.
රූපය 4. ප්රතිදානය publish_logs.py
අපගේ නල මාර්ග කේතය ලිවීම
දැන් අපි සියල්ල සූදානම් කර ඇති බැවින්, අපට විනෝදජනක කොටස ආරම්භ කළ හැකිය - බීම් සහ පයිතන් භාවිතයෙන් අපගේ නල මාර්ගය කේතනය කිරීම. බීම් නල මාර්ගයක් නිර්මාණය කිරීම සඳහා, අපි නල මාර්ගයේ වස්තුවක් (p) නිර්මාණය කළ යුතුය. අපි නල මාර්ග වස්තුවක් නිර්මාණය කළ පසු, අපට ක්රියාකරු භාවිතයෙන් එකින් එක කාර්යයන් කිහිපයක් යෙදිය හැකිය pipe (|)
. සාමාන්යයෙන්, කාර්ය ප්රවාහය පහත රූපයේ පරිදි පෙනේ.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
අපගේ කේතය තුළ, අපි අභිරුචි කාර්යයන් දෙකක් සාදන්නෙමු. කාර්යය regex_clean
, එය දත්ත පරිලෝකනය කර ශ්රිතය භාවිතයෙන් PATTERNS ලැයිස්තුව මත පදනම්ව අනුරූප පේළිය ලබා ගනී re.search
. ශ්රිතය කොමාවෙන් වෙන් වූ තන්තුවක් ලබා දෙයි. ඔබ නිත්ය ප්රකාශන විශේෂඥයෙක් නොවේ නම්, මෙය පරීක්ෂා කිරීමට මම නිර්දේශ කරමි datetime
එය වැඩ කිරීමට ශ්රිතයක් ඇතුලේ. මම ගොනුවේ ආරම්භයේදීම ආයාත දෝෂයක් ලබමින් සිටියෙමි, එය අමුතුයි. මෙම ලැයිස්තුව පසුව කාර්යයට යවනු ලැබේ WriteToBigQuery, එය සරලව අපගේ දත්ත වගුවට එක් කරයි. Batch DataFlow Job සහ Streaming DataFlow Job සඳහා වන කේතය පහත දක්වා ඇත. කණ්ඩායම සහ ප්රවාහ කේතය අතර ඇති එකම වෙනස වන්නේ කණ්ඩායම තුළ අපි CSV කියවීමයි src_path
කාර්යය භාවිතා කරමින් ReadFromText
බීම් සිට.
Batch DataFlow Job (කාණ්ඩ සැකසීම)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (ප්රවාහ සැකසීම)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
නල මාර්ගය ධාවනය කිරීම
අපට විවිධ ආකාරවලින් නල මාර්ගය ධාවනය කළ හැකිය. අපට අවශ්ය නම්, දුරස්ථව GCP වෙත ලොග් වීමේදී අපට එය ටර්මිනලයකින් දේශීයව ධාවනය කළ හැකිය.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
කෙසේ වෙතත්, අපි එය DataFlow භාවිතයෙන් ධාවනය කරන්නෙමු. පහත දැක්වෙන අවශ්ය පරාමිති සැකසීමෙන් පහත විධානය භාවිතයෙන් අපට මෙය කළ හැකිය.
project
— ඔබේ GCP ව්යාපෘතියේ ID.runner
ඔබේ වැඩසටහන විශ්ලේෂණය කර ඔබේ නල මාර්ගය ගොඩනඟන නල මාර්ග ධාවකයකි. වලාකුළෙහි ධාවනය කිරීමට, ඔබ DataflowRunner එකක් සඳහන් කළ යුතුය.staging_location
— කාර්යය ඉටු කරන ප්රොසෙසරවලට අවශ්ය කේත පැකේජ සුචිගත කිරීම සඳහා Cloud Dataflow cloud storage වෙත යන මාර්ගය.temp_location
— නල මාර්ගය ක්රියාත්මක වන විට සාදන ලද තාවකාලික රැකියා ලිපිගොනු ගබඩා කිරීම සඳහා Cloud Dataflow cloud storage වෙත මාර්ගය.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
මෙම විධානය ක්රියාත්මක වන අතරතුර, අපට ගූගල් කොන්සෝලයේ ඇති DataFlow ටැබයට ගොස් අපගේ නල මාර්ගය නැරඹිය හැකිය. අපි නල මාර්ගය මත ක්ලික් කරන විට, අපට රූපය 4 ට සමාන යමක් දැකිය යුතුය. නිදොස් කිරීමේ අරමුණු සඳහා, සවිස්තරාත්මක ලඝු-සටහන් බැලීම සඳහා ලොග් වෙත ගොස් Stackdriver වෙත යාම ඉතා ප්රයෝජනවත් වේ. මෙය අවස්ථා ගණනාවකදී නල මාර්ග ගැටළු විසඳීමට මට උපකාර කර ඇත.
රූපය 4: කදම්භ වාහකය
BigQuery හි අපගේ දත්ත වෙත ප්රවේශ වන්න
එබැවින්, අපගේ වගුව තුළට දත්ත ගලා යන නල මාර්ගයක් අපට දැනටමත් තිබිය යුතුය. මෙය පරීක්ෂා කිරීම සඳහා, අපට BigQuery වෙත ගොස් දත්ත දෙස බැලිය හැකිය. පහත විධානය භාවිතා කිරීමෙන් පසු ඔබ දත්ත කට්ටලයේ පළමු පේළි කිහිපය දැකිය යුතුය. දැන් අපි BigQuery තුළ දත්ත ගබඩා කර ඇති බැවින්, අපට වැඩිදුර විශ්ලේෂණය කිරීමට මෙන්ම සගයන් සමඟ දත්ත බෙදා ගැනීමට සහ ව්යාපාරික ප්රශ්නවලට පිළිතුරු දීමට පටන් ගත හැකිය.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
රූපය 5: BigQuery
නිගමනය
මෙම සටහන ප්රවාහ දත්ත නල මාර්ගයක් නිර්මාණය කිරීම මෙන්ම දත්ත වඩාත් ප්රවේශ විය හැකි ක්රම සොයා ගැනීම සඳහා ප්රයෝජනවත් උදාහරණයක් ලෙස සේවය කරනු ඇතැයි අපි බලාපොරොත්තු වෙමු. මෙම ආකෘතියේ දත්ත ගබඩා කිරීම අපට බොහෝ වාසි ලබා දෙයි. දැන් අපට අපගේ නිෂ්පාදනය භාවිතා කරන පුද්ගලයින් කී දෙනෙක් වැනි වැදගත් ප්රශ්නවලට පිළිතුරු දීමට පටන් ගත හැකිය. කාලයත් සමඟ ඔබේ පරිශීලක පදනම වර්ධනය වේද? මිනිසුන් වැඩිපුරම අන්තර්ක්රියා කරන්නේ නිෂ්පාදනයේ කුමන අංශද? අනික නොවිය යුතු තැන්වල වැරදි තියෙනවද? සංවිධානයට උනන්දුවක් දක්වන ප්රශ්න මේවාය. මෙම ප්රශ්නවලට පිළිතුරුවලින් මතුවන තීක්ෂ්ණ බුද්ධිය මත පදනම්ව, අපට නිෂ්පාදනය වැඩිදියුණු කිරීමට සහ පරිශීලක නියැලීම වැඩි කිරීමට හැකිය.
මෙම ආකාරයේ ව්යායාම සඳහා බීම් ඇත්තෙන්ම ප්රයෝජනවත් වන අතර තවත් රසවත් භාවිත අවස්ථා ගණනාවක්ද ඇත. උදාහරණයක් ලෙස, ඔබට තථ්ය කාලය තුළ කොටස් ටික් දත්ත විශ්ලේෂණය කිරීමට සහ විශ්ලේෂණය මත පදනම්ව වෙළඳාම් කිරීමට ඔබට අවශ්ය විය හැකිය, සමහරවිට ඔබට වාහනවලින් එන සංවේදක දත්ත ඇති අතර ගමනාගමන මට්ටමේ ගණනය කිරීම් ගණනය කිරීමට අවශ්ය වේ. ඔබට, උදාහරණයක් ලෙස, පරිශීලක දත්ත එකතු කරන සහ යතුරු ප්රමිතික හඹා යාමට උපකරණ පුවරු සෑදීමට එය භාවිතා කරන ක්රීඩා සමාගමක් විය හැකිය. හරි මහත්තයෝ මේක වෙන පෝස්ට් එකකට මාතෘකාවක්, කියෙව්වට ස්තුතියි, සම්පූර්ණ කෝඩ් එක බලන්න කැමති අයට මගේ GitHub එකට ලින්ක් එක පහලින් තියෙනවා.
එපමණයි.
මූලාශ්රය: www.habr.com