Hi all. We share the translation of the final part of the article, prepared specifically for the students of the course
Apache Beam and DataFlow for real-time pipelines
Setting up Google Cloud
Note: I used Google Cloud Shell to run the pipeline and publish userlog data because I was having trouble running the pipeline in Python 3. Google Cloud Shell uses Python 2, which is more consistent with Apache Beam.
To get the pipeline up and running, we need to dig a bit into the settings. For those of you who have never used GCP before, you need to complete the following 6 steps in this
After that, we will need to upload our scripts to Google Cloud Storage and copy them to our Google Cloud Shel. Uploading to cloud storage is quite trivial (description can be found
Figure 2
The commands we need to copy the files and install the required libraries are listed below.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Creating our database and table
Once we've completed all the setup steps, the next thing we need to do is create a dataset and table in BigQuery. There are several ways to do this, but the simplest is to use the Google Cloud console by first creating a dataset. You can follow the steps in the following
Figure 3. Table schema
Publication of user log data
Pub/Sub is a critical component of our pipeline as it allows multiple independent applications to communicate with each other. Specifically, it works as an intermediary allowing us to send and receive messages between applications. The first thing we need to do is create a topic (topic). Simply go to Pub/Sub in the console and click CREATE TOPIC.
The code below calls our script to generate the log data defined above and then connects and sends the logs to Pub/Sub. The only thing we need to do is to create an object PublisherClient, specify the theme path using the method topic_path
and call the function publish
Ρ topic_path
and data. Please note that we are importing generate_log_line
from our script stream_logs
so make sure those files are in the same folder otherwise you will get an import error. We can then run this through our google console using:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Once the file is run, we will be able to observe the output of the log data to the console, as shown in the figure below. This script will run as long as we don't use CTRL + Cto complete it.
Figure 4. Conclusion publish_logs.py
Writing Our Pipeline Code
Now that we've got everything set up, we can get to the fun part - coding our pipeline using Beam and Python. To create a Beam pipeline, we need to create a pipeline object (p). Once we've created a pipeline object, we can apply multiple functions one after the other using the operator pipe (|)
. In general, the workflow looks like the figure below.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
In our code, we will create two user-defined functions. Function regex_clean
, which scans the data and extracts the matching string based on the list of PATTERNS using the function re.search
. The function returns a comma-separated string. If you're not an expert on regular expressions, I recommend checking this out. datetime
inside the function to make it work. I was getting an error on import at the beginning of the file, which was weird. This list is then passed to the function WriteToBigQuery, which simply adds our data to the table. The code for Batch DataFlow Job and Streaming DataFlow Job is given below. The only difference between batch and stream code is that in batch we read CSV from src_path
using the function ReadFromText
from Beam.
Batch DataFlow Job (batch processing)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (stream processing)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Running the pipeline
We can start the pipeline in several different ways. If we wanted to, we could just run it locally from a terminal by logging into GCP remotely.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
However, we are going to run it with DataFlow. We can do this with the below command by setting the following required parameters.
project
- ID of your GCP project.runner
is a pipeline runner that will parse your program and construct your pipeline. To run in the cloud, you must specify a DataflowRunner.staging_location
is the path to the Cloud Dataflow cloud storage for indexing the code packages needed by the handlers that perform the work.temp_location
- Path to the Cloud Dataflow cloud storage for placing temporary job files created during the operation of the pipeline.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
While this command is running, we can go to the DataFlow tab in the google console and view our pipeline. By clicking on the pipeline, we should see something similar to Figure 4. For debugging purposes, it can be very useful to go to the logs and then to Stackdriver to see the detailed logs. This has helped me solve problems with the pipeline in a number of cases.
Figure 4: Beam pipeline
Accessing our data in BigQuery
So, we should already have a pipeline running with data coming into our table. To test this, we can go to BigQuery and view the data. After using the command below, you should see the first few lines of the data set. Now that we have the data stored in BigQuery, we can do further analysis as well as share the data with colleagues and start answering business questions.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Figure 5: BigQuery
Conclusion
We hope this post will serve as a useful example of building a streaming data pipeline, as well as finding ways to make data more accessible. Storing data in this format gives us many advantages. Now we can start answering important questions like how many people use our product? Is the user base growing over time? What aspects of the product do people interact with the most? And are there errors where they shouldn't be? These are the questions that will be of interest to the organization. Based on the insights generated from the answers to these questions, we will be able to improve the product and increase user engagement.
Beam is really useful for this type of exercise and also has a number of other interesting use cases. For example, you can analyze stock tick data in real time and make trades based on the analysis, maybe you have sensor data coming from vehicles and want to calculate the traffic level calculation. You could also, for example, be a gaming company that collects user data and uses it to create dashboards to track key metrics. Okay gentlemen, this is a topic for another post, thanks for reading, and for those who want to see the full code, below is a link to my GitHub.
That's all.
Source: habr.com