நாங்கள் ஸ்ட்ரீம் தரவு செயலாக்க பைப்லைனை உருவாக்குகிறோம். பகுதி 2

அனைவருக்கும் வணக்கம். கட்டுரையின் இறுதிப் பகுதியின் மொழிபெயர்ப்பைப் பகிர்கிறோம், குறிப்பாக பாடநெறி மாணவர்களுக்காக தயாரிக்கப்பட்டது. தரவு பொறியாளர். முதல் பகுதியைப் படிக்கலாம் இங்கே.

நிகழ்நேர பைப்லைன்களுக்கான அப்பாச்சி பீம் மற்றும் டேட்டாஃப்ளோ

நாங்கள் ஸ்ட்ரீம் தரவு செயலாக்க பைப்லைனை உருவாக்குகிறோம். பகுதி 2

Google Cloud ஐ அமைக்கிறது

குறிப்பு: பைதான் 3 இல் பைப்லைனை இயக்குவதில் சிக்கல் இருப்பதால், பைப்லைனை இயக்கவும் தனிப்பயன் பதிவுத் தரவை வெளியிடவும் Google Cloud Shell ஐப் பயன்படுத்தினேன். Google Cloud Shell ஆனது Apache Beam உடன் மிகவும் ஒத்துப்போகும் Python 2 ஐப் பயன்படுத்துகிறது.

பைப்லைனைத் தொடங்க, அமைப்புகளில் சிறிது தோண்டி எடுக்க வேண்டும். உங்களில் இதற்கு முன் GCP ஐப் பயன்படுத்தாதவர்கள், இதில் விவரிக்கப்பட்டுள்ள பின்வரும் 6 படிகளைப் பின்பற்ற வேண்டும் பக்கம்.

இதற்குப் பிறகு, எங்கள் ஸ்கிரிப்ட்களை Google கிளவுட் ஸ்டோரேஜில் பதிவேற்றி, அவற்றை எங்கள் Google கிளவுட் ஷெல்லில் நகலெடுக்க வேண்டும். மேகக்கணி சேமிப்பகத்தில் பதிவேற்றுவது மிகவும் அற்பமானது (விளக்கத்தைக் காணலாம் இங்கே) எங்கள் கோப்புகளை நகலெடுக்க, கீழே உள்ள படம் 2 இல் இடதுபுறத்தில் உள்ள முதல் ஐகானைக் கிளிக் செய்வதன் மூலம் கருவிப்பட்டியில் இருந்து Google Cloud Shel ஐத் திறக்கலாம்.

நாங்கள் ஸ்ட்ரீம் தரவு செயலாக்க பைப்லைனை உருவாக்குகிறோம். பகுதி 2
2 படம்

கோப்புகளை நகலெடுத்து தேவையான நூலகங்களை நிறுவ வேண்டிய கட்டளைகள் கீழே பட்டியலிடப்பட்டுள்ளன.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

எங்கள் தரவுத்தளம் மற்றும் அட்டவணையை உருவாக்குதல்

அமைவு தொடர்பான அனைத்து படிகளையும் முடித்தவுடன், நாம் அடுத்து செய்ய வேண்டியது BigQuery இல் தரவுத்தொகுப்பு மற்றும் அட்டவணையை உருவாக்குவதுதான். இதைச் செய்ய பல வழிகள் உள்ளன, ஆனால் முதலில் ஒரு தரவுத்தொகுப்பை உருவாக்குவதன் மூலம் Google கிளவுட் கன்சோலைப் பயன்படுத்துவது எளிமையானது. நீங்கள் கீழே உள்ள படிகளைப் பின்பற்றலாம் இணைப்பைஒரு திட்டத்துடன் ஒரு அட்டவணையை உருவாக்க. எங்கள் மேஜையில் இருக்கும் 7 நெடுவரிசைகள், ஒவ்வொரு பயனர் பதிவின் கூறுகளுக்கும் தொடர்புடையது. வசதிக்காக, டைம்லோக்கல் மாறி தவிர, எல்லா நெடுவரிசைகளையும் சரங்களாக வரையறுப்போம், மேலும் நாம் முன்பு உருவாக்கிய மாறிகளுக்கு ஏற்ப பெயரிடுவோம். எங்கள் அட்டவணையின் தளவமைப்பு படம் 3 இல் உள்ளதைப் போல இருக்க வேண்டும்.

நாங்கள் ஸ்ட்ரீம் தரவு செயலாக்க பைப்லைனை உருவாக்குகிறோம். பகுதி 2
படம் 3. அட்டவணை அமைப்பு

பயனர் பதிவுத் தரவை வெளியிடுகிறது

பப்/சப் என்பது எங்கள் பைப்லைனின் முக்கியமான அங்கமாகும், ஏனெனில் இது பல சுயாதீன பயன்பாடுகளை ஒருவருக்கொருவர் தொடர்பு கொள்ள அனுமதிக்கிறது. குறிப்பாக, அப்ளிகேஷன்களுக்கு இடையே செய்திகளை அனுப்பவும் பெறவும் அனுமதிக்கும் ஒரு இடைத்தரகராக இது செயல்படுகிறது. நாம் செய்ய வேண்டிய முதல் விஷயம் ஒரு தலைப்பை உருவாக்குவது. கன்சோலில் உள்ள பப்/சப் என்பதற்குச் சென்று, தலைப்பை உருவாக்கு என்பதைக் கிளிக் செய்யவும்.

கீழே உள்ள குறியீடு மேலே வரையறுக்கப்பட்ட பதிவுத் தரவை உருவாக்க எங்கள் ஸ்கிரிப்டை அழைக்கிறது, பின்னர் பதிவுகளை பப்/சப்க்கு இணைத்து அனுப்புகிறது. நாம் செய்ய வேண்டியது ஒரு பொருளை உருவாக்குவது மட்டுமே வெளியீட்டாளர் கிளையண்ட், முறையைப் பயன்படுத்தி தலைப்புக்கான பாதையைக் குறிப்பிடவும் topic_path மற்றும் செயல்பாட்டை அழைக்கவும் publish с topic_path மற்றும் தரவு. நாங்கள் இறக்குமதி செய்கிறோம் என்பதை நினைவில் கொள்க generate_log_line எங்கள் ஸ்கிரிப்டில் இருந்து stream_logs, எனவே இந்தக் கோப்புகள் ஒரே கோப்புறையில் இருப்பதை உறுதிசெய்யவும், இல்லையெனில் நீங்கள் இறக்குமதி பிழையைப் பெறுவீர்கள். இதைப் பயன்படுத்தி நமது Google கன்சோல் மூலம் இதை இயக்கலாம்:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

கோப்பு இயங்கியவுடன், கீழே உள்ள படத்தில் காட்டப்பட்டுள்ளபடி, கன்சோலுக்கான பதிவுத் தரவின் வெளியீட்டைக் காண முடியும். நாம் பயன்படுத்தாத வரை இந்த ஸ்கிரிப்ட் வேலை செய்யும் Ctrl + Cஅதை முடிக்க.

நாங்கள் ஸ்ட்ரீம் தரவு செயலாக்க பைப்லைனை உருவாக்குகிறோம். பகுதி 2
படம் 4. வெளியீடு publish_logs.py

எங்கள் பைப்லைன் குறியீட்டை எழுதுதல்

இப்போது நாங்கள் எல்லாவற்றையும் தயார் செய்துவிட்டோம், நாம் வேடிக்கையான பகுதியைத் தொடங்கலாம் - பீம் மற்றும் பைத்தானைப் பயன்படுத்தி எங்கள் பைப்லைனைக் குறியிடுதல். பீம் பைப்லைனை உருவாக்க, பைப்லைன் பொருளை (p) உருவாக்க வேண்டும். பைப்லைன் பொருளை உருவாக்கியவுடன், ஆபரேட்டரைப் பயன்படுத்தி பல செயல்பாடுகளை ஒன்றன் பின் ஒன்றாகப் பயன்படுத்தலாம் pipe (|). பொதுவாக, பணிப்பாய்வு கீழே உள்ள படம் போல் தெரிகிறது.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

எங்கள் குறியீட்டில், இரண்டு தனிப்பயன் செயல்பாடுகளை உருவாக்குவோம். செயல்பாடு regex_clean, இது தரவை ஸ்கேன் செய்து, செயல்பாட்டைப் பயன்படுத்தி பேட்டர்ன்ஸ் பட்டியலின் அடிப்படையில் தொடர்புடைய வரிசையை மீட்டெடுக்கிறது re.search. செயல்பாடு கமாவால் பிரிக்கப்பட்ட சரத்தை வழங்குகிறது. நீங்கள் வழக்கமான வெளிப்பாடு நிபுணர் இல்லையென்றால், இதைப் பார்க்க பரிந்துரைக்கிறேன் பயிற்சி குறியீட்டைச் சரிபார்க்க நோட்பேடில் பயிற்சி செய்யவும். இதற்குப் பிறகு, தனிப்பயன் ParDo செயல்பாட்டை வரையறுக்கிறோம் பிரி, இது இணை செயலாக்கத்திற்கான பீம் மாற்றத்தின் மாறுபாடு ஆகும். பைத்தானில், இது ஒரு சிறப்பு வழியில் செய்யப்படுகிறது - DoFn பீம் வகுப்பிலிருந்து பெறப்பட்ட ஒரு வகுப்பை நாம் உருவாக்க வேண்டும். ஸ்பிலிட் செயல்பாடு முந்தைய செயல்பாட்டிலிருந்து பாகுபடுத்தப்பட்ட வரிசையை எடுத்து, எங்கள் BigQuery அட்டவணையில் உள்ள நெடுவரிசைப் பெயர்களுடன் தொடர்புடைய விசைகளுடன் அகராதிகளின் பட்டியலை வழங்குகிறது. இந்த செயல்பாட்டைப் பற்றி கவனிக்க வேண்டிய ஒன்று உள்ளது: நான் இறக்குமதி செய்ய வேண்டியிருந்தது datetime ஒரு செயல்பாட்டின் உள்ளே அதைச் செயல்பட வைக்கிறது. கோப்பின் தொடக்கத்தில் எனக்கு ஒரு இறக்குமதி பிழை ஏற்பட்டது, இது வித்தியாசமாக இருந்தது. இந்த பட்டியல் பின்னர் செயல்பாட்டிற்கு அனுப்பப்படுகிறது WriteToBigQuery, இது எங்கள் தரவை அட்டவணையில் சேர்க்கிறது. Batch DataFlow Job மற்றும் Streaming DataFlow Jobக்கான குறியீடு கீழே கொடுக்கப்பட்டுள்ளது. தொகுதிக்கும் ஸ்ட்ரீமிங் குறியீட்டிற்கும் உள்ள ஒரே வித்தியாசம் என்னவென்றால், தொகுப்பில் நாம் CSV ஐப் படிக்கிறோம் src_pathசெயல்பாட்டைப் பயன்படுத்தி ReadFromText பீமில் இருந்து.

Batch DataFlow Job (தொகுப்பு செயலாக்கம்)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

ஸ்ட்ரீமிங் டேட்டாஃப்ளோ வேலை (ஸ்ட்ரீம் செயலாக்கம்)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

கன்வேயரைத் தொடங்குதல்

நாம் பல்வேறு வழிகளில் பைப்லைனை இயக்கலாம். நாம் விரும்பினால், ஜிசிபியில் ரிமோட் மூலம் உள்நுழையும்போது டெர்மினலில் இருந்து உள்நாட்டில் இயக்கலாம்.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

இருப்பினும், டேட்டாஃப்ளோவைப் பயன்படுத்தி அதை இயக்கப் போகிறோம். பின்வரும் தேவையான அளவுருக்களை அமைப்பதன் மூலம் கீழே உள்ள கட்டளையைப் பயன்படுத்தி இதைச் செய்யலாம்.

  • project — உங்கள் GCP திட்டத்தின் ஐடி.
  • runner ஒரு பைப்லைன் ரன்னர், இது உங்கள் திட்டத்தை ஆய்வு செய்து உங்கள் பைப்லைனை உருவாக்குகிறது. மேகக்கணியில் இயங்க, நீங்கள் DataflowRunner ஐக் குறிப்பிட வேண்டும்.
  • staging_location — க்ளவுட் டேட்டாஃப்ளோ கிளவுட் ஸ்டோரேஜுக்கான பாதை, பணியைச் செய்யும் செயலிகளுக்குத் தேவையான குறியீடு தொகுப்புகளை அட்டவணைப்படுத்துவதற்கு.
  • temp_location — பைப்லைன் இயங்கும் போது உருவாக்கப்பட்ட தற்காலிக வேலை கோப்புகளை சேமிப்பதற்காக கிளவுட் டேட்டாஃப்ளோ கிளவுட் சேமிப்பகத்திற்கான பாதை.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

இந்த கட்டளை இயங்கும் போது, ​​நாம் google console இல் DataFlow தாவலுக்குச் சென்று நமது பைப்லைனைப் பார்க்கலாம். பைப்லைனில் க்ளிக் செய்யும் போது, ​​படம் 4ஐப் போன்ற ஒன்றைப் பார்க்க வேண்டும். பிழைத்திருத்த நோக்கங்களுக்காக, பதிவுகளுக்குச் சென்று பின்னர் ஸ்டாக்டிரைவருக்குச் சென்று விரிவான பதிவுகளைப் பார்ப்பது மிகவும் உதவியாக இருக்கும். இது பல சந்தர்ப்பங்களில் பைப்லைன் சிக்கல்களைத் தீர்க்க எனக்கு உதவியது.

நாங்கள் ஸ்ட்ரீம் தரவு செயலாக்க பைப்லைனை உருவாக்குகிறோம். பகுதி 2
படம் 4: பீம் கன்வேயர்

BigQuery இல் எங்கள் தரவை அணுகவும்

எனவே, எங்கள் அட்டவணையில் தரவு பாயும் ஒரு பைப்லைன் ஏற்கனவே இருக்க வேண்டும். இதைச் சோதிக்க, நாம் BigQueryக்குச் சென்று தரவைப் பார்க்கலாம். கீழே உள்ள கட்டளையைப் பயன்படுத்திய பிறகு, தரவுத்தொகுப்பின் முதல் சில வரிசைகளைப் பார்க்க வேண்டும். இப்போது BigQuery இல் தரவு சேமிக்கப்பட்டுள்ளதால், நாங்கள் மேலும் பகுப்பாய்வு செய்யலாம், அத்துடன் தரவை சக ஊழியர்களுடன் பகிர்ந்து கொள்ளலாம் மற்றும் வணிகக் கேள்விகளுக்கு பதிலளிக்கத் தொடங்கலாம்.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

நாங்கள் ஸ்ட்ரீம் தரவு செயலாக்க பைப்லைனை உருவாக்குகிறோம். பகுதி 2
படம் 5: BigQuery

முடிவுக்கு

ஸ்ட்ரீமிங் டேட்டா பைப்லைனை உருவாக்குவதற்கும், தரவை மேலும் அணுகக்கூடிய வழிகளைக் கண்டறிவதற்கும் இந்த இடுகை ஒரு பயனுள்ள எடுத்துக்காட்டு என நம்புகிறோம். இந்த வடிவத்தில் தரவுகளை சேமிப்பது நமக்கு பல நன்மைகளை அளிக்கிறது. எங்கள் தயாரிப்பை எத்தனை பேர் பயன்படுத்துகிறார்கள் போன்ற முக்கியமான கேள்விகளுக்கு இப்போது பதிலளிக்க ஆரம்பிக்கலாம். உங்கள் பயனர் எண்ணிக்கை காலப்போக்கில் வளர்ந்து வருகிறதா? தயாரிப்பின் எந்த அம்சங்களை மக்கள் அதிகம் தொடர்பு கொள்கிறார்கள்? மேலும் இருக்கக்கூடாத இடங்களில் பிழைகள் உள்ளதா? இந்த கேள்விகள் நிறுவனத்திற்கு ஆர்வமாக இருக்கும். இந்தக் கேள்விகளுக்கான பதில்களில் இருந்து வெளிப்படும் நுண்ணறிவுகளின் அடிப்படையில், தயாரிப்பை மேம்படுத்தலாம் மற்றும் பயனர் ஈடுபாட்டை அதிகரிக்கலாம்.

இந்த வகை உடற்பயிற்சிக்கு பீம் மிகவும் பயனுள்ளதாக இருக்கிறது மற்றும் பல சுவாரஸ்யமான பயன்பாட்டு நிகழ்வுகளையும் கொண்டுள்ளது. எடுத்துக்காட்டாக, நீங்கள் நிகழ்நேரத்தில் ஸ்டாக் டிக் தரவை பகுப்பாய்வு செய்ய விரும்பலாம் மற்றும் பகுப்பாய்வின் அடிப்படையில் வர்த்தகம் செய்யலாம், ஒருவேளை வாகனங்களில் இருந்து வரும் சென்சார் தரவு உங்களிடம் இருக்கலாம் மற்றும் போக்குவரத்து நிலை கணக்கீடுகளைக் கணக்கிட விரும்பலாம். எடுத்துக்காட்டாக, நீங்கள் ஒரு கேமிங் நிறுவனமாக இருக்கலாம், அது பயனர் தரவைச் சேகரித்து, முக்கிய அளவீடுகளைக் கண்காணிக்க டாஷ்போர்டுகளை உருவாக்க அதைப் பயன்படுத்துகிறது. சரி, அன்பர்களே, இது மற்றொரு இடுகைக்கான தலைப்பு, படித்ததற்கு நன்றி, மேலும் முழு குறியீட்டைப் பார்க்க விரும்புபவர்களுக்கு, எனது GitHub க்கான இணைப்பு கீழே உள்ளது.

https://github.com/DFoly/User_log_pipeline

அவ்வளவுதான். பகுதி ஒன்றைப் படியுங்கள்.

ஆதாரம்: www.habr.com

கருத்தைச் சேர்