We create a pipeline for streaming data processing. Part 1

Hi all. Friends, we are sharing with you a translation of the article, prepared specifically for the students of the course Data Engineer. Go!

We create a pipeline for streaming data processing. Part 1

Apache Beam and DataFlow for real-time pipelines

Today's post is based on a task that I recently did at work. I was really happy to bring it to life and describe the work done in a blog post format, because it gave me the opportunity to work in data engineering, and also do something that would be very useful for my team. Not too long ago, I discovered that our systems have quite a large user log associated with one of our data products. It turned out that no one was using this data, so I was immediately interested in what we could learn if we began to analyze it regularly. However, there were several problems along the way. The first problem was that the data was stored in many different text files that were not available for instant analysis. The second problem was that they were stored in a closed system, so I couldn't use any of my favorite data analysis tools.

I had to figure out how to make it easier for us to access and add some value by incorporating this data source into some of our user experience solutions. After thinking for a while, I decided to build a pipeline to transfer this data to a cloud database so that the team and I can access it and start generating some kind of conclusion. After I completed my Data Engineering specialization at Coursera some time ago, I was eager to use some of the tools from the course in my project.

So hosting the data in a cloud database seemed like a reasonable way to solve my first problem, but what could I do about problem number 2? Luckily, there was a way to move this data into an environment where I could access tools like Python and Google Cloud Platform (GCP). However, it was a long process, so I needed to do something that would allow me to continue development while I waited for the data transfer to finish. The solution I came up with was to create fake data using the library Faker in python. I've never used this library before, but quickly realized how useful it is. Using this approach allowed me to start writing code and testing the pipeline without the actual data.

With that said, in this post I will explain how I built the above pipeline using some of the technologies available in GCP. In particular, I will use Apache Beam (Python version), Dataflow, Pub/Sub and Big Query to collect user logs, transform data and transfer it to a database for further analysis. In my case, I only needed Beam's batch functionality as my data was not coming in real time, so Pub/Sub was not required. However, I'll stick with the streaming version, since that's something you might encounter in practice.

Introduction to GCP and Apache Beam

Google Cloud Platform provides a set of really useful tools for processing big data. Here are some of the tools I will be using:

  • Pub/Sub is a messaging service using the Publisher-Subscriber pattern that allows us to receive data in real time.
  • data flow is a service that simplifies the creation of data pipelines and automatically resolves tasks such as infrastructure scaling, which means we can only focus on writing the code for our pipeline.
  • BigQuery is a cloud storage. If you are familiar with other SQL databases, you won't have to deal with BigQuery for a long time.
  • Finally, we will be using Apache Beam, specifically focusing on the Python version to build our pipeline. This tool will allow us to create a pipeline for streaming or batch processing that integrates with GCP. It is especially useful for parallel processing and is suitable for extraction, transformation, and load (ETL) tasks, so if we need to move data from one place to another with transformations or calculations, Beam is a good choice.

There is a wide variety of tools available on GCP so it can be difficult to keep track of them all and what their purpose is but here is a summary of them for reference.
There are a large number of tools available in GCP, so it can be difficult to cover them all, including their purpose, but nonetheless here summary for reference.

Visualization of our pipeline

Let's visualize the components of our pipeline on 1 drawing. At a high level, we want to collect real-time user data, process it, and pass it to BigQuery. Logs are created when users interact with the product by sending requests to the server, which are then logged. This data can be especially useful in understanding how users interact with our product and whether they work properly. In general, the pipeline will contain the following steps:

Beam makes this process very easy, whether we have a streaming data source or a CSV file and want to batch process. You will see later that there are only minimal changes in the code needed to switch between them. This is one of the benefits of using Beam.

We create a pipeline for streaming data processing. Part 1
Figure 1: Main data pipeline: Source:

Creating Pseudo Data with Faker

As I mentioned earlier, due to limited data access, I decided to create pseudo data in the same format as the actual data. This was a really rewarding exercise as I could write the code and test the pipeline while I was waiting for the data. I suggest looking at documentation Faker if you want to know what else this library has to offer. Our user data will generally look like the example below. Based on this format, we can generate data line by line to simulate real-time data. These logs give us information such as the date, request type, response from the server, IP address, etc.

192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"

Based on the line above, we want to create our variable LINEusing the 7 variables in curly braces below. We'll also be using them as variable names in our table schema in a bit.

LINE = """
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"
"""

If we were doing batch processing, the code would be very similar, although we would need to create a set of samples in some time range. To use the faker, we simply create an object and call the methods we need. In particular, Faker has been useful for creating IP addresses as well as websites. I have used the following methods:

fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()

from faker import Faker
import time
import random
import os
import numpy as np
from datetime import datetime, timedelta



LINE = """
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"
"""


def generate_log_line():
    fake = Faker()
    now = datetime.now()
    remote_addr = fake.ipv4()
    time_local = now.strftime('%d/%b/%Y:%H:%M:%S')
    request_type = random.choice(["GET", "POST", "PUT"])
    request_path = "/" + fake.uri_path()

    status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05])
    body_bytes_sent = random.choice(range(5, 1000, 1))
    http_referer = fake.uri()
    http_user_agent = fake.user_agent()

    log_line = LINE.format(
        remote_addr=remote_addr,
        time_local=time_local,
        request_type=request_type,
        request_path=request_path,
        status=status,
        body_bytes_sent=body_bytes_sent,
        http_referer=http_referer,
        http_user_agent=http_user_agent
    )

    return log_line

The end of the first part.

In the coming days, we will share with you the continuation of the article, and now we are traditionally waiting for comments ;-).

Source: habr.com

Add a comment