We create a pipeline for streaming data processing. Part 2

Hi all. We share the translation of the final part of the article, prepared specifically for the students of the course Data Engineer. The first part can be found here.

Apache Beam and DataFlow for real-time pipelines

We create a pipeline for streaming data processing. Part 2

Setting up Google Cloud

Note: I used Google Cloud Shell to run the pipeline and publish userlog data because I was having trouble running the pipeline in Python 3. Google Cloud Shell uses Python 2, which is more consistent with Apache Beam.

To get the pipeline up and running, we need to dig a bit into the settings. For those of you who have never used GCP before, you need to complete the following 6 steps in this page.

After that, we will need to upload our scripts to Google Cloud Storage and copy them to our Google Cloud Shel. Uploading to cloud storage is quite trivial (description can be found here). To copy our files, we can open Google Cloud Shel from the toolbar by clicking the first icon on the left in Figure 2 below.

We create a pipeline for streaming data processing. Part 2
Figure 2

The commands we need to copy the files and install the required libraries are listed below.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Creating our database and table

Once we've completed all the setup steps, the next thing we need to do is create a dataset and table in BigQuery. There are several ways to do this, but the simplest is to use the Google Cloud console by first creating a dataset. You can follow the steps in the following linkto create a table with a schema. Our table will have 7 columns, corresponding to the components of each user log. For convenience, we will define all columns as strings (of type string), except for the timelocal variable, and name them according to the variables we generated earlier. Our table layout should look like Figure 3.

We create a pipeline for streaming data processing. Part 2
Figure 3. Table schema

Publication of user log data

Pub/Sub is a critical component of our pipeline as it allows multiple independent applications to communicate with each other. Specifically, it works as an intermediary allowing us to send and receive messages between applications. The first thing we need to do is create a topic (topic). Simply go to Pub/Sub in the console and click CREATE TOPIC.

The code below calls our script to generate the log data defined above and then connects and sends the logs to Pub/Sub. The only thing we need to do is to create an object PublisherClient, specify the theme path using the method topic_path and call the function publish с topic_path and data. Please note that we are importing generate_log_line from our script stream_logsso make sure those files are in the same folder otherwise you will get an import error. We can then run this through our google console using:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Once the file is run, we will be able to observe the output of the log data to the console, as shown in the figure below. This script will run as long as we don't use CTRL + Cto complete it.

We create a pipeline for streaming data processing. Part 2
Figure 4. Conclusion publish_logs.py

Writing Our Pipeline Code

Now that we've got everything set up, we can get to the fun part - coding our pipeline using Beam and Python. To create a Beam pipeline, we need to create a pipeline object (p). Once we've created a pipeline object, we can apply multiple functions one after the other using the operator pipe (|). In general, the workflow looks like the figure below.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

In our code, we will create two user-defined functions. Function regex_clean, which scans the data and extracts the matching string based on the list of PATTERNS using the function re.search. The function returns a comma-separated string. If you're not an expert on regular expressions, I recommend checking this out. tutorial and practice in notepad to test the code. We then define a custom ParDo function called Split, which is a variation of Beam transform for parallel processing. In Python, this is done in a special way - we must create a class that inherits from the DoFn Beam class. The Split function takes the parsed string from the previous function and returns a list of dictionaries with keys corresponding to the column names in our BigQuery table. There is something to note about this function: I had to import datetime inside the function to make it work. I was getting an error on import at the beginning of the file, which was weird. This list is then passed to the function WriteToBigQuery, which simply adds our data to the table. The code for Batch DataFlow Job and Streaming DataFlow Job is given below. The only difference between batch and stream code is that in batch we read CSV from src_pathusing the function ReadFromText from Beam.

Batch DataFlow Job (batch processing)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (stream processing)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Running the pipeline

We can start the pipeline in several different ways. If we wanted to, we could just run it locally from a terminal by logging into GCP remotely.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

However, we are going to run it with DataFlow. We can do this with the below command by setting the following required parameters.

  • project - ID of your GCP project.
  • runner is a pipeline runner that will parse your program and construct your pipeline. To run in the cloud, you must specify a DataflowRunner.
  • staging_location is the path to the Cloud Dataflow cloud storage for indexing the code packages needed by the handlers that perform the work.
  • temp_location - Path to the Cloud Dataflow cloud storage for placing temporary job files created during the operation of the pipeline.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

While this command is running, we can go to the DataFlow tab in the google console and view our pipeline. By clicking on the pipeline, we should see something similar to Figure 4. For debugging purposes, it can be very useful to go to the logs and then to Stackdriver to see the detailed logs. This has helped me solve problems with the pipeline in a number of cases.

We create a pipeline for streaming data processing. Part 2
Figure 4: Beam pipeline

Accessing our data in BigQuery

So, we should already have a pipeline running with data coming into our table. To test this, we can go to BigQuery and view the data. After using the command below, you should see the first few lines of the data set. Now that we have the data stored in BigQuery, we can do further analysis as well as share the data with colleagues and start answering business questions.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

We create a pipeline for streaming data processing. Part 2
Figure 5: BigQuery

Conclusion

We hope this post will serve as a useful example of building a streaming data pipeline, as well as finding ways to make data more accessible. Storing data in this format gives us many advantages. Now we can start answering important questions like how many people use our product? Is the user base growing over time? What aspects of the product do people interact with the most? And are there errors where they shouldn't be? These are the questions that will be of interest to the organization. Based on the insights generated from the answers to these questions, we will be able to improve the product and increase user engagement.

Beam is really useful for this type of exercise and also has a number of other interesting use cases. For example, you can analyze stock tick data in real time and make trades based on the analysis, maybe you have sensor data coming from vehicles and want to calculate the traffic level calculation. You could also, for example, be a gaming company that collects user data and uses it to create dashboards to track key metrics. Okay gentlemen, this is a topic for another post, thanks for reading, and for those who want to see the full code, below is a link to my GitHub.

https://github.com/DFoly/User_log_pipeline

That's all. Read the first part.

Source: habr.com

Add a comment