Створюємо конвеєр потокової обробки даних. Частина 2

Всім привіт. Ділимося перекладом заключної частини статті, підготовленої спеціально для студентів курсу "Data Engineer". З першою частиною можна ознайомитись тут.

Apache Beam та DataFlow для конвеєрів реального часу

Створюємо конвеєр потокової обробки даних. Частина 2

Налаштування Google Cloud

Примітка: Для запуску конвеєра та публікації даних користувача лога я використовував Google Cloud Shell, оскільки у мене виникли проблеми із запуском конвеєра на Python 3. Google Cloud Shell використовує Python 2, який краще узгоджується з Apache Beam.

Щоб запустити конвеєр, нам потрібно трохи покопатися у налаштуваннях. Тим, хто раніше не користувався GCP, необхідно виконати наступні 6 кроків, наведених на цій сторінці.

Після цього нам потрібно буде завантажити наші скрипти у хмарне сховище Google та скопіювати їх у нашу Google Cloud Shel. Завантаження в хмарне сховище досить тривіальне (опис можна знайти тут). Щоб скопіювати наші файли, ми можемо відкрити Google Cloud Shel із панелі інструментів, клацнувши перший значок зліва на малюнку 2 нижче.

Створюємо конвеєр потокової обробки даних. Частина 2
Малюнок 2

Команди, які нам потрібні для копіювання файлів та встановлення необхідних бібліотек, наведено нижче.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Створення нашої бази даних та таблиці

Після того, як ми виконали всі кроки, пов'язані з налаштуванням, наступне, що нам потрібно зробити, це створити набір даних та таблицю у BigQuery. Є кілька способів зробити це, але найпростіший – використовувати консоль Google Cloud, спочатку створивши набір даних. Ви можете виконати дії, вказані за наступною за посиланнямстворити таблицю зі схемою. Наша таблиця матиме 7 стовпців, відповідних компонентам кожного лога користувача. Для зручності ми визначимо всі стовпці як рядки (тип string), за винятком змінної timelocal, і назвемо їх відповідно до змінних, які ми згенерували раніше. Схема нашої таблиці має виглядати як малюнку 3.

Створюємо конвеєр потокової обробки даних. Частина 2
Малюнок 3. Схема таблиці

Публікація даних користувача лога

Pub/Sub є критично важливим компонентом нашого конвеєра, оскільки дозволяє кільком незалежним програмам взаємодіяти один з одним. Зокрема, він працює як посередник, що дозволяє нам надсилати та отримувати повідомлення між додатками. Перше, що потрібно зробити, це створити тему (topic). Достатньо просто перейти в Pub/Sub у консолі та натиснути CREATE TOPIC.

Наведений нижче код викликає наш скрипт для генерації даних лога, визначених вище, а потім підключається та відправляє журнали до Pub/Sub. Єдине, що нам потрібно зробити, це створити об'єкт PublisherClient, вказати шлях до теми за допомогою методу topic_path та викликати функцію publish с topic_path та даними. Зверніть увагу, що ми імпортуємо generate_log_line з нашого скрипту stream_logsтому переконайтеся, що ці файли знаходяться в одній папці, інакше ви отримаєте помилку імпорту. Потім ми можемо запустити це через нашу google-консоль, використовуючи:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Як тільки файл запуститься, ми зможемо спостерігати виведення даних лога на консоль, як показано нижче. Цей скрипт буде працювати доти, доки ми не використовуємо CTRL + C, щоб завершити його.

Створюємо конвеєр потокової обробки даних. Частина 2
Малюнок 4. Висновок publish_logs.py

Написання коду нашого конвеєра

Тепер, коли ми всі підготували, ми можемо розпочати найцікавішу частину — написання коду нашого конвеєра, використовуючи Beam і Python. Щоб створити Beam-конвеєр, потрібно створити об'єкт конвеєра (p). Після того, як ми створили об'єкт конвеєра, ми можемо застосувати кілька функцій одну за одною, використовуючи оператор pipe (|). Загалом робочий процес виглядає як на малюнку нижче.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

У нашому коді ми створимо дві функції користувача. функцію regex_clean, яка сканує дані та витягує відповідний рядок на основі списку PATTERNS, використовуючи функцію re.search. Функція повертає розділений комами рядок. Якщо ви не є експертом з регулярних виразів, я рекомендую ознайомитися з цим туторіалом та попрактикуватися в блокноті, щоб перевірити код. Після цього ми визначаємо користувальницьку ParDo-функцію під назвою розщепленийяка є варіацією Beam-перетворення для паралельної обробки. У Python це робиться особливим способом – ми маємо створити клас, який успадковується від класу DoFn Beam. Функція Split приймає розпаршений рядок з попередньої функції та повертає список словників із ключами, що відповідають іменам стовпців у нашій таблиці BigQuery. Є дещо, що треба сказати про цю функцію: мені довелося імпортувати datetime всередині функції, щоб вона працювала. Я отримував повідомлення про помилку під час імпорту на початку файлу, що було дивно. Цей список потім передається у функцію WriteToBigQuery, яка просто додає наші дані до таблиці. Код Batch DataFlow Job та Streaming DataFlow Job наведено нижче. Єдина відмінність між пакетним і потоковим кодом полягає в тому, що в пакетній обробці ми читаємо CSV з src_path, використовуючи функцію ReadFromText з Beam.

Batch DataFlow Job (обробка пакетів)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (обробка потоку)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Запуск конвеєра

Ми можемо запустити конвеєр кількома різними способами. Якби ми захотіли, ми могли б просто запустити його локально з терміналу, віддалено увійшовши до GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Однак, ми збираємося запустити його за допомогою DataFlow. Ми можемо зробити це за допомогою наведеної нижче команди, встановивши наступні обов'язкові параметри.

  • project - ID вашого проекту GCP.
  • runner - Засіб запуску конвеєра, який проаналізує вашу програму і сконструює ваш конвеєр. Для виконання у хмарі ви повинні вказати DataflowRunner.
  • staging_location — шлях до хмарного сховища Cloud Dataflow для індексування пакетів коду, необхідних обробникам, які виконують роботу.
  • temp_location - шлях до хмарного сховища Cloud Dataflow для розміщення тимчасових файлів завдань, створених під час роботи конвеєра.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Поки ця команда виконується, ми можемо перейти на вкладку DataFlow у google-консолі та переглянути наш конвеєр. Клікнувши конвеєром, ми повинні побачити щось схоже на малюнок 4. З метою налагодження може бути дуже корисно перейти в логи, а потім в Stackdriver для перегляду докладних логів. Це допомогло мені вирішити проблеми з конвеєром у ряді випадків.

Створюємо конвеєр потокової обробки даних. Частина 2
Малюнок 4: Beam-конвеєр

Доступ до наших даних у BigQuery

Отже, у нас вже має бути запущений конвеєр із даними, що надходять до нашої таблиці. Щоб перевірити це, ми можемо перейти до BigQuery та переглянути дані. Після використання команди нижче, ви повинні побачити перші кілька рядків набору даних. Тепер, коли ми маємо дані, що зберігаються в BigQuery, ми можемо провести подальший аналіз, а також поділитися даними з колегами та почати відповідати на бізнес-питання.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Створюємо конвеєр потокової обробки даних. Частина 2
Малюнок 5: BigQuery

Висновок

Сподіваємося, що цей пост стане корисним прикладом створення потокового конвеєра даних, а також пошуку способів зробити дані більш доступними. Зберігання даних у такому форматі дає нам багато переваг. Тепер ми можемо почати відповідати на важливі питання, наприклад, скільки людей використовують наш продукт? Чи з часом зростає база користувачів? З якими аспектами продукту люди взаємодіють найбільше? І чи є помилки, де їх бути не повинно? Це ті питання, які будуть цікавими для організації. На основі ідей, що випливають із відповідей на ці запитання, ми зможемо вдосконалити продукт та підвищити зацікавленість користувачів.

Beam дійсно корисний для такого типу вправ, а також має низку інших цікавих випадків використання. Наприклад, ви можете аналізувати дані з біржових тиків у режимі реального часу і здійснювати угоди на основі аналізу, можливо, у вас є дані датчиків, що надходять з транспортних засобів, і ви хочете обчислити розрахунок рівня трафіку. Ви також можете, наприклад, бути ігровою компанією, яка збирає дані про користувачів та використовує її для створення інформаційних панелей для відстеження ключових показників. Гаразд, панове, це тема вже для іншого посту, дякую за читання, а для тих, хто хоче побачити повний код нижче посилання на мій GitHub.

https://github.com/DFoly/User_log_pipeline

На цьому все. Читати першу частину.

Джерело: habr.com

Додати коментар або відгук