අපි ප්‍රවාහ දත්ත සැකසුම් නල මාර්ගයක් සාදන්නෙමු. 2 කොටස

ආයුබෝවන් සියල්ලටම. අපි පාඨමාලාවේ සිසුන් සඳහා විශේෂයෙන් සකස් කරන ලද ලිපියේ අවසාන කොටසෙහි පරිවර්තනය බෙදා ගනිමු. දත්ත ඉංජිනේරු. ඔබට පළමු කොටස කියවිය හැකිය මෙහි.

තත්‍ය කාලීන නල මාර්ග සඳහා Apache Beam සහ DataFlow

අපි ප්‍රවාහ දත්ත සැකසුම් නල මාර්ගයක් සාදන්නෙමු. 2 කොටස

Google Cloud පිහිටුවීම

සටහන: මම Python 3 හි නල මාර්ගය ධාවනය කිරීමේදී ගැටළු ඇති නිසා නල මාර්ගය ධාවනය කිරීමට සහ අභිරුචි ලොග් දත්ත ප්‍රකාශ කිරීමට Google Cloud Shell භාවිතා කළෙමි. Google Cloud Shell Apache Beam සමඟ වඩාත් අනුකූල වන Python 2 භාවිතා කරයි.

නල මාර්ගය ආරම්භ කිරීම සඳහා, අපි සැකසුම් වලට ටිකක් හාරා ගත යුතුය. ඔබ මින් පෙර GCP භාවිතා නොකළ අය සඳහා, ඔබට මෙහි දක්වා ඇති පහත පියවර 6 අනුගමනය කිරීමට අවශ්‍ය වනු ඇත පිටුව.

මෙයින් පසු, අපට අපගේ ස්ක්‍රිප්ට් Google Cloud Storage වෙත උඩුගත කර ඒවා අපගේ Google Cloud Shel වෙත පිටපත් කිරීමට අවශ්‍ය වනු ඇත. වලාකුළු ආචයනයට උඩුගත කිරීම ඉතා සුළු දෙයකි (විස්තරයක් සොයාගත හැකිය මෙහි) අපගේ ගොනු පිටපත් කිරීමට, පහත රූපයේ 2 හි වම් පස ඇති පළමු අයිකනය ක්ලික් කිරීමෙන් අපට මෙවලම් තීරුවෙන් Google Cloud Shel විවෘත කළ හැක.

අපි ප්‍රවාහ දත්ත සැකසුම් නල මාර්ගයක් සාදන්නෙමු. 2 කොටස
2 රූපය

ගොනු පිටපත් කර අවශ්‍ය පුස්තකාල ස්ථාපනය කිරීමට අපට අවශ්‍ය විධාන පහත දැක්වේ.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

අපගේ දත්ත සමුදාය සහ වගුව නිර්මාණය කිරීම

අපි සියලු පිහිටුවීම් සම්බන්ධ පියවර සම්පූර්ණ කළ පසු, අපට කළ යුතු ඊළඟ දෙය වන්නේ BigQuery හි දත්ත කට්ටලයක් සහ වගුවක් නිර්මාණය කිරීමයි. මෙය කිරීමට ක්‍රම කිහිපයක් ඇත, නමුත් සරලම දෙය නම් දත්ත කට්ටලයක් සෑදීමෙන් Google Cloud කොන්සෝලය භාවිතා කිරීමයි. ඔබට පහත පියවර අනුගමනය කළ හැක ලින්ක්සැලැස්මක් සහිත වගුවක් සෑදීමට. අපේ මේසය ඇත තීරු 7 ක්, එක් එක් පරිශීලක ලොගයේ සංරචක වලට අනුරූප වේ. පහසුව සඳහා, අපි timelocal විචල්‍යය හැර අනෙකුත් සියලුම තීරු තන්තු ලෙස නිර්වචනය කර අප කලින් ජනනය කළ විචල්‍ය අනුව ඒවා නම් කරන්නෙමු. අපගේ මේසයේ පිරිසැලසුම රූප සටහන 3 හි මෙන් විය යුතුය.

අපි ප්‍රවාහ දත්ත සැකසුම් නල මාර්ගයක් සාදන්නෙමු. 2 කොටස
රූපය 3. වගු සැකැස්ම

පරිශීලක ලොග් දත්ත ප්‍රකාශයට පත් කිරීම

Pub/Sub යනු අපගේ නල මාර්ගයේ තීරණාත්මක අංගයකි, මන්ද එය බහු ස්වාධීන යෙදුම් එකිනෙකා සමඟ සන්නිවේදනය කිරීමට ඉඩ සලසයි. විශේෂයෙන්, එය යෙදුම් අතර පණිවිඩ යැවීමට සහ ලැබීමට ඉඩ සලසන අතරමැදියෙකු ලෙස ක්‍රියා කරයි. අපි කළ යුතු පළමු දෙය නම් මාතෘකාවක් නිර්මාණය කිරීමයි. කොන්සෝලයේ Pub/Sub වෙත ගොස් CREATE TOPIC ක්ලික් කරන්න.

පහත කේතය ඉහත නිර්වචනය කර ඇති ලොග් දත්ත උත්පාදනය කිරීමට අපගේ ස්ක්‍රිප්ටය අමතා පසුව සම්බන්ධ කර ලොග Pub/Sub වෙත යවයි. අපි කළ යුතු එකම දෙය වස්තුවක් නිර්මාණය කිරීමයි PublisherClient, ක්රමය භාවිතා කරමින් මාතෘකාවට මාර්ගය සඳහන් කරන්න topic_path සහ කාර්යය අමතන්න publish с topic_path සහ දත්ත. අපි ආනයනය කරන බව කරුණාවෙන් සලකන්න generate_log_line අපේ පිටපතෙන් stream_logs, එබැවින් මෙම ගොනු එකම ෆෝල්ඩරයේ ඇති බවට වග බලා ගන්න, එසේ නොමැතිනම් ඔබට ආනයන දෝෂයක් ලැබෙනු ඇත. එවිට අපට මෙය අපගේ ගූගල් කොන්සෝලය හරහා ධාවනය කළ හැක:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

ගොනුව ක්‍රියාත්මක වූ වහාම, පහත රූපයේ දැක්වෙන පරිදි, කොන්සෝලය වෙත ලොග් දත්ත ප්‍රතිදානය අපට දැකගත හැකි වනු ඇත. අපි භාවිතා නොකරන තාක් කල් මෙම ස්ක්‍රිප්ට් එක ක්‍රියා කරයි CTRL + Cඑය සම්පූර්ණ කිරීමට.

අපි ප්‍රවාහ දත්ත සැකසුම් නල මාර්ගයක් සාදන්නෙමු. 2 කොටස
රූපය 4. ප්රතිදානය publish_logs.py

අපගේ නල මාර්ග කේතය ලිවීම

දැන් අපි සියල්ල සූදානම් කර ඇති බැවින්, අපට විනෝදජනක කොටස ආරම්භ කළ හැකිය - බීම් සහ පයිතන් භාවිතයෙන් අපගේ නල මාර්ගය කේතනය කිරීම. බීම් නල මාර්ගයක් නිර්මාණය කිරීම සඳහා, අපි නල මාර්ගයේ වස්තුවක් (p) නිර්මාණය කළ යුතුය. අපි නල මාර්ග වස්තුවක් නිර්මාණය කළ පසු, අපට ක්‍රියාකරු භාවිතයෙන් එකින් එක කාර්යයන් කිහිපයක් යෙදිය හැකිය pipe (|). සාමාන්යයෙන්, කාර්ය ප්රවාහය පහත රූපයේ පරිදි පෙනේ.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

අපගේ කේතය තුළ, අපි අභිරුචි කාර්යයන් දෙකක් සාදන්නෙමු. කාර්යය regex_clean, එය දත්ත පරිලෝකනය කර ශ්‍රිතය භාවිතයෙන් PATTERNS ලැයිස්තුව මත පදනම්ව අනුරූප පේළිය ලබා ගනී re.search. ශ්‍රිතය කොමාවෙන් වෙන් වූ තන්තුවක් ලබා දෙයි. ඔබ නිත්‍ය ප්‍රකාශන විශේෂඥයෙක් නොවේ නම්, මෙය පරීක්ෂා කිරීමට මම නිර්දේශ කරමි නිබන්ධනය සහ කේතය පරීක්ෂා කිරීමට notepad එකක පුහුණු වන්න. මෙයින් පසු අපි අභිරුචි ParDo ශ්‍රිතයක් නිර්වචනය කරමු බෙදුණු, එය සමාන්තර සැකසුම් සඳහා කදම්භ පරිණාමනයේ ප්‍රභේදයකි. Python වලදී, මෙය විශේෂ ආකාරයකින් සිදු කෙරේ - අපි DoFn Beam පන්තියෙන් උරුම වන පන්තියක් නිර්මාණය කළ යුතුය. Split ශ්‍රිතය පෙර ශ්‍රිතයෙන් විග්‍රහ කළ පේළිය ගෙන අපගේ BigQuery වගුවේ ඇති තීරු නාමවලට ​​අනුරූප යතුරු සහිත ශබ්ද කෝෂ ලැයිස්තුවක් ආපසු ලබා දෙයි. මෙම කාර්යය ගැන සටහන් කිරීමට යමක් තිබේ: මට ආනයනය කිරීමට සිදු විය datetime එය වැඩ කිරීමට ශ්‍රිතයක් ඇතුලේ. මම ගොනුවේ ආරම්භයේදීම ආයාත දෝෂයක් ලබමින් සිටියෙමි, එය අමුතුයි. මෙම ලැයිස්තුව පසුව කාර්යයට යවනු ලැබේ WriteToBigQuery, එය සරලව අපගේ දත්ත වගුවට එක් කරයි. Batch DataFlow Job සහ Streaming DataFlow Job සඳහා වන කේතය පහත දක්වා ඇත. කණ්ඩායම සහ ප්‍රවාහ කේතය අතර ඇති එකම වෙනස වන්නේ කණ්ඩායම තුළ අපි CSV කියවීමයි src_pathකාර්යය භාවිතා කරමින් ReadFromText බීම් සිට.

Batch DataFlow Job (කාණ්ඩ සැකසීම)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (ප්‍රවාහ සැකසීම)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

නල මාර්ගය ධාවනය කිරීම

අපට විවිධ ආකාරවලින් නල මාර්ගය ධාවනය කළ හැකිය. අපට අවශ්‍ය නම්, දුරස්ථව GCP වෙත ලොග් වීමේදී අපට එය ටර්මිනලයකින් දේශීයව ධාවනය කළ හැකිය.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

කෙසේ වෙතත්, අපි එය DataFlow භාවිතයෙන් ධාවනය කරන්නෙමු. පහත දැක්වෙන අවශ්‍ය පරාමිති සැකසීමෙන් පහත විධානය භාවිතයෙන් අපට මෙය කළ හැකිය.

  • project — ඔබේ GCP ව්‍යාපෘතියේ ID.
  • runner ඔබේ වැඩසටහන විශ්ලේෂණය කර ඔබේ නල මාර්ගය ගොඩනඟන නල මාර්ග ධාවකයකි. වලාකුළෙහි ධාවනය කිරීමට, ඔබ DataflowRunner එකක් සඳහන් කළ යුතුය.
  • staging_location — කාර්යය ඉටු කරන ප්‍රොසෙසරවලට අවශ්‍ය කේත පැකේජ සුචිගත කිරීම සඳහා Cloud Dataflow cloud storage වෙත යන මාර්ගය.
  • temp_location — නල මාර්ගය ක්‍රියාත්මක වන විට සාදන ලද තාවකාලික රැකියා ලිපිගොනු ගබඩා කිරීම සඳහා Cloud Dataflow cloud storage වෙත මාර්ගය.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

මෙම විධානය ක්‍රියාත්මක වන අතරතුර, අපට ගූගල් කොන්සෝලයේ ඇති DataFlow ටැබයට ගොස් අපගේ නල මාර්ගය නැරඹිය හැකිය. අපි නල මාර්ගය මත ක්ලික් කරන විට, අපට රූපය 4 ට සමාන යමක් දැකිය යුතුය. නිදොස් කිරීමේ අරමුණු සඳහා, සවිස්තරාත්මක ලඝු-සටහන් බැලීම සඳහා ලොග් වෙත ගොස් Stackdriver වෙත යාම ඉතා ප්‍රයෝජනවත් වේ. මෙය අවස්ථා ගණනාවකදී නල මාර්ග ගැටළු විසඳීමට මට උපකාර කර ඇත.

අපි ප්‍රවාහ දත්ත සැකසුම් නල මාර්ගයක් සාදන්නෙමු. 2 කොටස
රූපය 4: කදම්භ වාහකය

BigQuery හි අපගේ දත්ත වෙත ප්‍රවේශ වන්න

එබැවින්, අපගේ වගුව තුළට දත්ත ගලා යන නල මාර්ගයක් අපට දැනටමත් තිබිය යුතුය. මෙය පරීක්ෂා කිරීම සඳහා, අපට BigQuery වෙත ගොස් දත්ත දෙස බැලිය හැකිය. පහත විධානය භාවිතා කිරීමෙන් පසු ඔබ දත්ත කට්ටලයේ පළමු පේළි කිහිපය දැකිය යුතුය. දැන් අපි BigQuery තුළ දත්ත ගබඩා කර ඇති බැවින්, අපට වැඩිදුර විශ්ලේෂණය කිරීමට මෙන්ම සගයන් සමඟ දත්ත බෙදා ගැනීමට සහ ව්‍යාපාරික ප්‍රශ්නවලට පිළිතුරු දීමට පටන් ගත හැකිය.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

අපි ප්‍රවාහ දත්ත සැකසුම් නල මාර්ගයක් සාදන්නෙමු. 2 කොටස
රූපය 5: BigQuery

නිගමනය

මෙම සටහන ප්‍රවාහ දත්ත නල මාර්ගයක් නිර්මාණය කිරීම මෙන්ම දත්ත වඩාත් ප්‍රවේශ විය හැකි ක්‍රම සොයා ගැනීම සඳහා ප්‍රයෝජනවත් උදාහරණයක් ලෙස සේවය කරනු ඇතැයි අපි බලාපොරොත්තු වෙමු. මෙම ආකෘතියේ දත්ත ගබඩා කිරීම අපට බොහෝ වාසි ලබා දෙයි. දැන් අපට අපගේ නිෂ්පාදනය භාවිතා කරන පුද්ගලයින් කී දෙනෙක් වැනි වැදගත් ප්‍රශ්නවලට පිළිතුරු දීමට පටන් ගත හැකිය. කාලයත් සමඟ ඔබේ පරිශීලක පදනම වර්ධනය වේද? මිනිසුන් වැඩිපුරම අන්තර්ක්‍රියා කරන්නේ නිෂ්පාදනයේ කුමන අංශද? අනික නොවිය යුතු තැන්වල වැරදි තියෙනවද? සංවිධානයට උනන්දුවක් දක්වන ප්රශ්න මේවාය. මෙම ප්‍රශ්නවලට පිළිතුරුවලින් මතුවන තීක්ෂ්ණ බුද්ධිය මත පදනම්ව, අපට නිෂ්පාදනය වැඩිදියුණු කිරීමට සහ පරිශීලක නියැලීම වැඩි කිරීමට හැකිය.

මෙම ආකාරයේ ව්‍යායාම සඳහා බීම් ඇත්තෙන්ම ප්‍රයෝජනවත් වන අතර තවත් රසවත් භාවිත අවස්ථා ගණනාවක්ද ඇත. උදාහරණයක් ලෙස, ඔබට තථ්‍ය කාලය තුළ කොටස් ටික් දත්ත විශ්ලේෂණය කිරීමට සහ විශ්ලේෂණය මත පදනම්ව වෙළඳාම් කිරීමට ඔබට අවශ්‍ය විය හැකිය, සමහරවිට ඔබට වාහනවලින් එන සංවේදක දත්ත ඇති අතර ගමනාගමන මට්ටමේ ගණනය කිරීම් ගණනය කිරීමට අවශ්‍ය වේ. ඔබට, උදාහරණයක් ලෙස, පරිශීලක දත්ත එකතු කරන සහ යතුරු ප්‍රමිතික හඹා යාමට උපකරණ පුවරු සෑදීමට එය භාවිතා කරන ක්‍රීඩා සමාගමක් විය හැකිය. හරි මහත්තයෝ මේක වෙන පෝස්ට් එකකට මාතෘකාවක්, කියෙව්වට ස්තුතියි, සම්පූර්ණ කෝඩ් එක බලන්න කැමති අයට මගේ GitHub එකට ලින්ක් එක පහලින් තියෙනවා.

https://github.com/DFoly/User_log_pipeline

එපමණයි. පළමු කොටස කියවන්න.

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න