መረጃን ለማሰራጨት የቧንቧ መስመር እንፈጥራለን. ክፍል 2

ሰላም ሁላችሁም። በተለይ ለኮርሱ ተማሪዎች የተዘጋጀውን የጽሁፉን የመጨረሻ ክፍል ትርጉም እናካፍላለን የውሂብ መሐንዲስ. የመጀመሪያው ክፍል ሊገኝ ይችላል እዚህ.

Apache Beam እና DataFlow ለእውነተኛ ጊዜ የቧንቧ መስመሮች

መረጃን ለማሰራጨት የቧንቧ መስመር እንፈጥራለን. ክፍል 2

ጉግል ክላውድን በማዘጋጀት ላይ

ማስታወሻ፡ የቧንቧ መስመርን ለማስኬድ እና የተጠቃሚ ሎግ ዳታ ለማተም ጎግል ክላውድ ሼልን ተጠቀምኩ ምክንያቱም በፓይዘን 3 ውስጥ የቧንቧ መስመርን ለማስኬድ ችግር ስላጋጠመኝ ነው። ጎግል ክላውድ ሼል ከ Apache Beam ጋር የበለጠ የሚስማማውን Python 2 ይጠቀማል።

የቧንቧ መስመሩን ለማስኬድ, ወደ ቅንጅቶች ትንሽ መቆፈር አለብን. ከዚህ በፊት GCP ን ተጠቅማ ላላቹ፣ በዚህ ውስጥ የሚከተሉትን 6 ደረጃዎች ማጠናቀቅ አለባችሁ ገጽ.

ከዚያ በኋላ ስክሪፕቶቻችንን ወደ ጎግል ክላውድ ማከማቻ መስቀል እና ወደ ጎግል ክላውድ ሼል መቅዳት አለብን። ወደ ደመና ማከማቻ መስቀል በጣም ቀላል ነው (መግለጫ ሊገኝ ይችላል። እዚህ). ፋይሎቻችንን ለመቅዳት፣ ከታች በስእል 2 በግራ በኩል ያለውን የመጀመሪያውን አዶ ጠቅ በማድረግ ጎግል ክላውድ ሼልን ከመሳሪያ አሞሌው መክፈት እንችላለን።

መረጃን ለማሰራጨት የቧንቧ መስመር እንፈጥራለን. ክፍል 2
የ 2 ስዕል

ፋይሎቹን ለመቅዳት እና አስፈላጊውን ቤተ-መጽሐፍት ለመጫን የሚያስፈልጉን ትዕዛዞች ከዚህ በታች ተዘርዝረዋል.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

የእኛን የውሂብ ጎታ እና ጠረጴዛ መፍጠር

ሁሉንም የማዋቀር ደረጃዎች እንደጨረስን ቀጣዩ ነገር በBigQuery ውስጥ የውሂብ ስብስብ እና ሠንጠረዥ መፍጠር ነው። ይህንን ለማድረግ ብዙ መንገዶች አሉ ነገር ግን ቀላሉ መንገድ የውሂብ ስብስብ በመፍጠር ጎግል ክላውድ ኮንሶል መጠቀም ነው። በሚከተለው ውስጥ ያሉትን ደረጃዎች መከተል ይችላሉ ማያያዣከመርሃግብሩ ጋር ጠረጴዛ ለመፍጠር. የእኛ ጠረጴዛ ይኖረዋል 7 አምዶች, ከእያንዳንዱ የተጠቃሚ ምዝግብ አካል ክፍሎች ጋር ይዛመዳል. ለምቾት ሲባል፣ ሁሉንም ዓምዶች እንደ ሕብረቁምፊዎች (የሕብረቁምፊ ዓይነት) እንገልጻቸዋለን፣ ከጊዜአዊው ተለዋዋጭ በስተቀር፣ እና ቀደም ብለን ባፈጠርናቸው ተለዋዋጮች መሠረት እንሰይማቸዋለን። የእኛ የጠረጴዛ አቀማመጥ ምስል 3 መምሰል አለበት.

መረጃን ለማሰራጨት የቧንቧ መስመር እንፈጥራለን. ክፍል 2
ምስል 3. የሰንጠረዥ እቅድ

የተጠቃሚ ምዝግብ ማስታወሻ ውሂብ ማተም

በርካታ ገለልተኛ አፕሊኬሽኖች እርስ በርስ እንዲግባቡ ስለሚያስችል ፐብ/ንኡስ የቧንቧ መስመራችን ወሳኝ አካል ነው። በተለይም በመተግበሪያዎች መካከል መልዕክቶችን እንድንልክ እና እንድንቀበል የሚያስችለን እንደ አማላጅ ሆኖ ይሰራል። ማድረግ ያለብን የመጀመሪያው ነገር ርዕስ (ርዕስ) መፍጠር ነው. በቀላሉ በኮንሶሉ ውስጥ ወደ አታሚ/ንዑስ ይሂዱ እና ርዕስ ፍጠርን ጠቅ ያድርጉ።

ከዚህ በታች ያለው ኮድ የእኛን ስክሪፕት በመደወል ከላይ የተገለጸውን የምዝግብ ማስታወሻ ውሂብ ያመነጫል እና ከዚያ ያገናኛል እና ምዝግብ ማስታወሻዎቹን ወደ pub/Sub ይልካል። እኛ ማድረግ ያለብን ነገር መፍጠር ብቻ ነው። አታሚ ደንበኛ, ዘዴውን በመጠቀም የገጽታውን መንገድ ይግለጹ topic_path እና ተግባሩን ይደውሉ publish с topic_path እና ውሂብ. እባኮትን እያስመጣን ነው። generate_log_line ከስክሪፕታችን stream_logsስለዚህ እነዚያ ፋይሎች በተመሳሳይ አቃፊ ውስጥ መሆናቸውን ያረጋግጡ አለበለዚያ የማስመጣት ስህተት ይደርስብዎታል. ይህንን በመጠቀም በ google ኮንሶል በኩል ማሄድ እንችላለን፡-

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

ፋይሉ አንዴ ከተሰራ በኋላ ከታች በስዕሉ ላይ እንደሚታየው የምዝግብ ማስታወሻውን ወደ ኮንሶሉ ላይ ያለውን ውጤት ለመመልከት እንችላለን. እስካልተጠቀምን ድረስ ይህ ስክሪፕት ይሰራል CTRL + Cለማጠናቀቅ.

መረጃን ለማሰራጨት የቧንቧ መስመር እንፈጥራለን. ክፍል 2
ምስል 4. መደምደሚያ publish_logs.py

የእኛን የቧንቧ መስመር ኮድ መጻፍ

አሁን ሁሉንም ነገር አዘጋጅተናል፣ ወደ አዝናኝ ክፍል ልንሄድ እንችላለን - Beam እና Python በመጠቀም የቧንቧ መስመሮቻችንን ኮድ ማድረግ። የቢም ቧንቧን ለመፍጠር የቧንቧ መስመር ነገር (p) መፍጠር አለብን. የቧንቧ መስመር ነገርን ከፈጠርን በኋላ ኦፕሬተሩን በመጠቀም ብዙ ተግባራትን አንድ በአንድ መተግበር እንችላለን pipe (|). በአጠቃላይ የስራ ሂደቱ ከታች ያለውን ምስል ይመስላል.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

በእኛ ኮድ ውስጥ, በተጠቃሚ የተገለጹ ሁለት ተግባራትን እንፈጥራለን. ተግባር regex_cleanተግባሩን በመጠቀም በPATTERNS ዝርዝር ላይ በመመስረት መረጃውን የሚቃኝ እና ተዛማጅ ሕብረቁምፊውን ያወጣል። re.search. ተግባሩ በነጠላ ሰረዝ የተለየ ሕብረቁምፊ ይመልሳል። በመደበኛ አገላለጾች ላይ ኤክስፐርት ካልሆኑ, ይህንን እንዲመለከቱ እመክራለሁ. አጋዥ ስልጠና እና ኮዱን ለመፈተሽ በማስታወሻ ደብተር ውስጥ ይለማመዱ። ከዚያ የተጠራውን ብጁ የፓርዶ ተግባር እንገልፃለን። ሰነጠቀ, ይህም የ Beam ትራንስፎርሜሽን ለትይዩ ማቀነባበሪያ ልዩነት ነው. በፓይዘን ውስጥ ይህ የሚደረገው በልዩ መንገድ ነው - ከ DoFn Beam ክፍል የሚወርስ ክፍል መፍጠር አለብን። የSplit ተግባር የተተነተነውን ሕብረቁምፊ ከቀደመው ተግባር ወስዶ የመዝገበ-ቃላቶችን ዝርዝር ከBigQuery ሰንጠረዡ ውስጥ ካሉት የአምድ ስሞች ጋር በሚዛመዱ ቁልፎች ይመልሳል። በዚህ ተግባር ላይ ልብ ሊባል የሚገባው ነገር አለ፡ ማስመጣት ነበረብኝ datetime እንዲሰራ ለማድረግ በተግባሩ ውስጥ. በፋይሉ መጀመሪያ ላይ በማስመጣት ላይ ስህተት እያጋጠመኝ ነበር፣ ይህም እንግዳ ነበር። ይህ ዝርዝር ወደ ተግባሩ ይተላለፋል ቶቢግ መጠይቅ ይፃፉ, ይህም በቀላሉ የእኛን ውሂብ ወደ ጠረጴዛው ይጨምራል. የ Batch DataFlow የስራ እና የዥረት ዳታ ፍሰት ስራ ኮድ ከዚህ በታች ቀርቧል። በባች እና በዥረት ኮድ መካከል ያለው ብቸኛው ልዩነት በቡድን ውስጥ CSV ን እናነባለን። src_pathተግባሩን በመጠቀም ReadFromText ከቢም.

ባች የውሂብ ፍሰት ሥራ (የባች ሂደት)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

የውሂብ ፍሰት ስራ (የዥረት ሂደት)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

የቧንቧ መስመርን ማካሄድ

የቧንቧ መስመርን በተለያዩ መንገዶች መጀመር እንችላለን. ከፈለግን ወደ ጂሲፒ በርቀት በመግባት ከተርሚናል ልናስኬደው እንችላለን።

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

ሆኖም ግን በ DataFlow ልናስኬደው ነው። የሚከተሉትን አስፈላጊ መለኪያዎች በማዘጋጀት ከዚህ በታች ባለው ትእዛዝ ይህንን ማድረግ እንችላለን ።

  • project - የእርስዎ GCP ፕሮጀክት መታወቂያ።
  • runner ፕሮግራምህን የሚተነተን እና ቧንቧህን የሚገነባ የቧንቧ መስመር ሯጭ ነው። በደመና ውስጥ ለማሄድ፣ DataflowRunnerን መጥቀስ አለብዎት።
  • staging_location ስራውን በሚያከናውኑት ተቆጣጣሪዎች የሚፈለጉትን የኮድ ፓኬጆችን ለመጠቆም ወደ የደመና ዳታ ፍሰት ደመና ማከማቻ መንገድ ነው።
  • temp_location - የቧንቧ መስመር በሚሠራበት ጊዜ የተፈጠሩ ጊዜያዊ የሥራ ፋይሎችን ለማስቀመጥ ወደ ደመና የውሂብ ፍሰት ደመና ማከማቻ መንገድ።
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

ይህ ትእዛዝ እየሰራ ሳለ በ google ኮንሶል ውስጥ ወደ DataFlow ትር ሄደን የቧንቧ መስመሮቻችንን ማየት እንችላለን። የቧንቧ መስመርን ጠቅ በማድረግ, ከስእል 4 ጋር ተመሳሳይ የሆነ ነገር ማየት አለብን. ለማረም ዓላማዎች, ወደ ሎግዎች እና ከዚያም ወደ Stackdriver በመሄድ ዝርዝር ምዝግብ ማስታወሻዎችን ለማየት በጣም ጠቃሚ ሊሆን ይችላል. ይህ በበርካታ አጋጣሚዎች ከቧንቧ ጋር የተያያዙ ችግሮችን ለመፍታት ረድቶኛል.

መረጃን ለማሰራጨት የቧንቧ መስመር እንፈጥራለን. ክፍል 2
ምስል 4: Beam pipeline

በBigQuery ውስጥ የእኛን ውሂብ መድረስ

ስለዚህ, ወደ ጠረጴዛችን የሚገቡ መረጃዎችን የያዘ የቧንቧ መስመር ቀድሞውኑ ሊኖረን ይገባል. ይህንን ለመፈተሽ ወደ BigQuery ሄደን ውሂቡን ማየት እንችላለን። ከዚህ በታች ያለውን ትዕዛዝ ከተጠቀሙ በኋላ የውሂብ ስብስብ የመጀመሪያዎቹን ጥቂት መስመሮች ማየት አለብዎት. አሁን በBigQuery ውስጥ የተከማቸ መረጃ ስላለን፣ ተጨማሪ ትንታኔዎችን ማድረግ እንዲሁም ውሂቡን ከባልደረባዎች ጋር መጋራት እና የንግድ ጥያቄዎችን መመለስ እንችላለን።

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

መረጃን ለማሰራጨት የቧንቧ መስመር እንፈጥራለን. ክፍል 2
ምስል 5፡ BigQuery

መደምደሚያ

ይህ ልኡክ ጽሁፍ የዥረት ማስተላለፊያ መስመርን ለመገንባት፣ እንዲሁም መረጃን የበለጠ ተደራሽ ለማድረግ መንገዶችን ለመፈለግ እንደ ጠቃሚ ምሳሌ እንደሚያገለግል ተስፋ እናደርጋለን። መረጃን በዚህ ቅርጸት ማከማቸት ብዙ ጥቅሞችን ይሰጠናል። አሁን ምን ያህል ሰዎች የእኛን ምርት እንደሚጠቀሙ ያሉ አስፈላጊ ጥያቄዎችን መመለስ እንችላለን? የተጠቃሚው መሰረት ከጊዜ ወደ ጊዜ እያደገ ነው? ሰዎች በጣም የሚገናኙት የትኞቹ የምርት ገጽታዎች ናቸው? እና መሆን የማይገባቸው ስህተቶች አሉ? እነዚህ ጥያቄዎች ለድርጅቱ ትኩረት የሚስቡ ናቸው. ለእነዚህ ጥያቄዎች ከተሰጡት መልሶች በተገኙት ግንዛቤዎች ላይ በመመስረት ምርቱን ማሻሻል እና የተጠቃሚዎችን ተሳትፎ ማሳደግ እንችላለን።

Beam ለዚህ ዓይነቱ የአካል ብቃት እንቅስቃሴ በጣም ጠቃሚ ነው እና ሌሎች በርካታ አስደሳች የአጠቃቀም ጉዳዮችም አሉት። ለምሳሌ፣ የአክሲዮን ምልክት መረጃን በእውነተኛ ጊዜ መተንተን እና በመተንተን ላይ ተመስርተህ ግብይቶችን ማድረግ ትችላለህ፣ ምናልባት ከተሽከርካሪዎች የመጣ ዳሳሽ መረጃ አለህ እና የትራፊክ ደረጃ ስሌትን ማስላት ትፈልጋለህ። እንዲሁም፣ ለምሳሌ የተጠቃሚ ውሂብን የሚሰበስብ እና የቁልፍ መለኪያዎችን ለመከታተል ዳሽቦርድ ለመፍጠር የሚጠቀም የጨዋታ ኩባንያ መሆን ትችላለህ። እሺ ክቡራን፣ ይህ ለሌላ ልጥፍ ርዕስ ነው፣ ስላነበባችሁ እናመሰግናለን፣ እና ሙሉውን ኮድ ማየት ለሚፈልጉ፣ ከታች የእኔ GitHub አገናኝ አለ።

https://github.com/DFoly/User_log_pipeline

ይኼው ነው. የመጀመሪያውን ክፍል ያንብቡ.

ምንጭ: hab.com

አስተያየት ያክሉ