ሰላም ሁላችሁም። በተለይ ለኮርሱ ተማሪዎች የተዘጋጀውን የጽሁፉን የመጨረሻ ክፍል ትርጉም እናካፍላለን
Apache Beam እና DataFlow ለእውነተኛ ጊዜ የቧንቧ መስመሮች
ጉግል ክላውድን በማዘጋጀት ላይ
ማስታወሻ፡ የቧንቧ መስመርን ለማስኬድ እና የተጠቃሚ ሎግ ዳታ ለማተም ጎግል ክላውድ ሼልን ተጠቀምኩ ምክንያቱም በፓይዘን 3 ውስጥ የቧንቧ መስመርን ለማስኬድ ችግር ስላጋጠመኝ ነው። ጎግል ክላውድ ሼል ከ Apache Beam ጋር የበለጠ የሚስማማውን Python 2 ይጠቀማል።
የቧንቧ መስመሩን ለማስኬድ, ወደ ቅንጅቶች ትንሽ መቆፈር አለብን. ከዚህ በፊት GCP ን ተጠቅማ ላላቹ፣ በዚህ ውስጥ የሚከተሉትን 6 ደረጃዎች ማጠናቀቅ አለባችሁ
ከዚያ በኋላ ስክሪፕቶቻችንን ወደ ጎግል ክላውድ ማከማቻ መስቀል እና ወደ ጎግል ክላውድ ሼል መቅዳት አለብን። ወደ ደመና ማከማቻ መስቀል በጣም ቀላል ነው (መግለጫ ሊገኝ ይችላል።
የ 2 ስዕል
ፋይሎቹን ለመቅዳት እና አስፈላጊውን ቤተ-መጽሐፍት ለመጫን የሚያስፈልጉን ትዕዛዞች ከዚህ በታች ተዘርዝረዋል.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
የእኛን የውሂብ ጎታ እና ጠረጴዛ መፍጠር
ሁሉንም የማዋቀር ደረጃዎች እንደጨረስን ቀጣዩ ነገር በBigQuery ውስጥ የውሂብ ስብስብ እና ሠንጠረዥ መፍጠር ነው። ይህንን ለማድረግ ብዙ መንገዶች አሉ ነገር ግን ቀላሉ መንገድ የውሂብ ስብስብ በመፍጠር ጎግል ክላውድ ኮንሶል መጠቀም ነው። በሚከተለው ውስጥ ያሉትን ደረጃዎች መከተል ይችላሉ
ምስል 3. የሰንጠረዥ እቅድ
የተጠቃሚ ምዝግብ ማስታወሻ ውሂብ ማተም
በርካታ ገለልተኛ አፕሊኬሽኖች እርስ በርስ እንዲግባቡ ስለሚያስችል ፐብ/ንኡስ የቧንቧ መስመራችን ወሳኝ አካል ነው። በተለይም በመተግበሪያዎች መካከል መልዕክቶችን እንድንልክ እና እንድንቀበል የሚያስችለን እንደ አማላጅ ሆኖ ይሰራል። ማድረግ ያለብን የመጀመሪያው ነገር ርዕስ (ርዕስ) መፍጠር ነው. በቀላሉ በኮንሶሉ ውስጥ ወደ አታሚ/ንዑስ ይሂዱ እና ርዕስ ፍጠርን ጠቅ ያድርጉ።
ከዚህ በታች ያለው ኮድ የእኛን ስክሪፕት በመደወል ከላይ የተገለጸውን የምዝግብ ማስታወሻ ውሂብ ያመነጫል እና ከዚያ ያገናኛል እና ምዝግብ ማስታወሻዎቹን ወደ pub/Sub ይልካል። እኛ ማድረግ ያለብን ነገር መፍጠር ብቻ ነው። አታሚ ደንበኛ, ዘዴውን በመጠቀም የገጽታውን መንገድ ይግለጹ topic_path
እና ተግባሩን ይደውሉ publish
с topic_path
እና ውሂብ. እባኮትን እያስመጣን ነው። generate_log_line
ከስክሪፕታችን stream_logs
ስለዚህ እነዚያ ፋይሎች በተመሳሳይ አቃፊ ውስጥ መሆናቸውን ያረጋግጡ አለበለዚያ የማስመጣት ስህተት ይደርስብዎታል. ይህንን በመጠቀም በ google ኮንሶል በኩል ማሄድ እንችላለን፡-
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
ፋይሉ አንዴ ከተሰራ በኋላ ከታች በስዕሉ ላይ እንደሚታየው የምዝግብ ማስታወሻውን ወደ ኮንሶሉ ላይ ያለውን ውጤት ለመመልከት እንችላለን. እስካልተጠቀምን ድረስ ይህ ስክሪፕት ይሰራል CTRL + Cለማጠናቀቅ.
ምስል 4. መደምደሚያ publish_logs.py
የእኛን የቧንቧ መስመር ኮድ መጻፍ
አሁን ሁሉንም ነገር አዘጋጅተናል፣ ወደ አዝናኝ ክፍል ልንሄድ እንችላለን - Beam እና Python በመጠቀም የቧንቧ መስመሮቻችንን ኮድ ማድረግ። የቢም ቧንቧን ለመፍጠር የቧንቧ መስመር ነገር (p) መፍጠር አለብን. የቧንቧ መስመር ነገርን ከፈጠርን በኋላ ኦፕሬተሩን በመጠቀም ብዙ ተግባራትን አንድ በአንድ መተግበር እንችላለን pipe (|)
. በአጠቃላይ የስራ ሂደቱ ከታች ያለውን ምስል ይመስላል.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
በእኛ ኮድ ውስጥ, በተጠቃሚ የተገለጹ ሁለት ተግባራትን እንፈጥራለን. ተግባር regex_clean
ተግባሩን በመጠቀም በPATTERNS ዝርዝር ላይ በመመስረት መረጃውን የሚቃኝ እና ተዛማጅ ሕብረቁምፊውን ያወጣል። re.search
. ተግባሩ በነጠላ ሰረዝ የተለየ ሕብረቁምፊ ይመልሳል። በመደበኛ አገላለጾች ላይ ኤክስፐርት ካልሆኑ, ይህንን እንዲመለከቱ እመክራለሁ. datetime
እንዲሰራ ለማድረግ በተግባሩ ውስጥ. በፋይሉ መጀመሪያ ላይ በማስመጣት ላይ ስህተት እያጋጠመኝ ነበር፣ ይህም እንግዳ ነበር። ይህ ዝርዝር ወደ ተግባሩ ይተላለፋል ቶቢግ መጠይቅ ይፃፉ, ይህም በቀላሉ የእኛን ውሂብ ወደ ጠረጴዛው ይጨምራል. የ Batch DataFlow የስራ እና የዥረት ዳታ ፍሰት ስራ ኮድ ከዚህ በታች ቀርቧል። በባች እና በዥረት ኮድ መካከል ያለው ብቸኛው ልዩነት በቡድን ውስጥ CSV ን እናነባለን። src_path
ተግባሩን በመጠቀም ReadFromText
ከቢም.
ባች የውሂብ ፍሰት ሥራ (የባች ሂደት)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
የውሂብ ፍሰት ስራ (የዥረት ሂደት)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
የቧንቧ መስመርን ማካሄድ
የቧንቧ መስመርን በተለያዩ መንገዶች መጀመር እንችላለን. ከፈለግን ወደ ጂሲፒ በርቀት በመግባት ከተርሚናል ልናስኬደው እንችላለን።
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
ሆኖም ግን በ DataFlow ልናስኬደው ነው። የሚከተሉትን አስፈላጊ መለኪያዎች በማዘጋጀት ከዚህ በታች ባለው ትእዛዝ ይህንን ማድረግ እንችላለን ።
project
- የእርስዎ GCP ፕሮጀክት መታወቂያ።runner
ፕሮግራምህን የሚተነተን እና ቧንቧህን የሚገነባ የቧንቧ መስመር ሯጭ ነው። በደመና ውስጥ ለማሄድ፣ DataflowRunnerን መጥቀስ አለብዎት።staging_location
ስራውን በሚያከናውኑት ተቆጣጣሪዎች የሚፈለጉትን የኮድ ፓኬጆችን ለመጠቆም ወደ የደመና ዳታ ፍሰት ደመና ማከማቻ መንገድ ነው።temp_location
- የቧንቧ መስመር በሚሠራበት ጊዜ የተፈጠሩ ጊዜያዊ የሥራ ፋይሎችን ለማስቀመጥ ወደ ደመና የውሂብ ፍሰት ደመና ማከማቻ መንገድ።streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
ይህ ትእዛዝ እየሰራ ሳለ በ google ኮንሶል ውስጥ ወደ DataFlow ትር ሄደን የቧንቧ መስመሮቻችንን ማየት እንችላለን። የቧንቧ መስመርን ጠቅ በማድረግ, ከስእል 4 ጋር ተመሳሳይ የሆነ ነገር ማየት አለብን. ለማረም ዓላማዎች, ወደ ሎግዎች እና ከዚያም ወደ Stackdriver በመሄድ ዝርዝር ምዝግብ ማስታወሻዎችን ለማየት በጣም ጠቃሚ ሊሆን ይችላል. ይህ በበርካታ አጋጣሚዎች ከቧንቧ ጋር የተያያዙ ችግሮችን ለመፍታት ረድቶኛል.
ምስል 4: Beam pipeline
በBigQuery ውስጥ የእኛን ውሂብ መድረስ
ስለዚህ, ወደ ጠረጴዛችን የሚገቡ መረጃዎችን የያዘ የቧንቧ መስመር ቀድሞውኑ ሊኖረን ይገባል. ይህንን ለመፈተሽ ወደ BigQuery ሄደን ውሂቡን ማየት እንችላለን። ከዚህ በታች ያለውን ትዕዛዝ ከተጠቀሙ በኋላ የውሂብ ስብስብ የመጀመሪያዎቹን ጥቂት መስመሮች ማየት አለብዎት. አሁን በBigQuery ውስጥ የተከማቸ መረጃ ስላለን፣ ተጨማሪ ትንታኔዎችን ማድረግ እንዲሁም ውሂቡን ከባልደረባዎች ጋር መጋራት እና የንግድ ጥያቄዎችን መመለስ እንችላለን።
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
ምስል 5፡ BigQuery
መደምደሚያ
ይህ ልኡክ ጽሁፍ የዥረት ማስተላለፊያ መስመርን ለመገንባት፣ እንዲሁም መረጃን የበለጠ ተደራሽ ለማድረግ መንገዶችን ለመፈለግ እንደ ጠቃሚ ምሳሌ እንደሚያገለግል ተስፋ እናደርጋለን። መረጃን በዚህ ቅርጸት ማከማቸት ብዙ ጥቅሞችን ይሰጠናል። አሁን ምን ያህል ሰዎች የእኛን ምርት እንደሚጠቀሙ ያሉ አስፈላጊ ጥያቄዎችን መመለስ እንችላለን? የተጠቃሚው መሰረት ከጊዜ ወደ ጊዜ እያደገ ነው? ሰዎች በጣም የሚገናኙት የትኞቹ የምርት ገጽታዎች ናቸው? እና መሆን የማይገባቸው ስህተቶች አሉ? እነዚህ ጥያቄዎች ለድርጅቱ ትኩረት የሚስቡ ናቸው. ለእነዚህ ጥያቄዎች ከተሰጡት መልሶች በተገኙት ግንዛቤዎች ላይ በመመስረት ምርቱን ማሻሻል እና የተጠቃሚዎችን ተሳትፎ ማሳደግ እንችላለን።
Beam ለዚህ ዓይነቱ የአካል ብቃት እንቅስቃሴ በጣም ጠቃሚ ነው እና ሌሎች በርካታ አስደሳች የአጠቃቀም ጉዳዮችም አሉት። ለምሳሌ፣ የአክሲዮን ምልክት መረጃን በእውነተኛ ጊዜ መተንተን እና በመተንተን ላይ ተመስርተህ ግብይቶችን ማድረግ ትችላለህ፣ ምናልባት ከተሽከርካሪዎች የመጣ ዳሳሽ መረጃ አለህ እና የትራፊክ ደረጃ ስሌትን ማስላት ትፈልጋለህ። እንዲሁም፣ ለምሳሌ የተጠቃሚ ውሂብን የሚሰበስብ እና የቁልፍ መለኪያዎችን ለመከታተል ዳሽቦርድ ለመፍጠር የሚጠቀም የጨዋታ ኩባንያ መሆን ትችላለህ። እሺ ክቡራን፣ ይህ ለሌላ ልጥፍ ርዕስ ነው፣ ስላነበባችሁ እናመሰግናለን፣ እና ሙሉውን ኮድ ማየት ለሚፈልጉ፣ ከታች የእኔ GitHub አገናኝ አለ።
ይኼው ነው.
ምንጭ: hab.com