Ствараем канвеер струменевай апрацоўкі дадзеных. Частка 2

Ўсім прывітанне. Дзелімся перакладам заключнай часткі артыкула, падрыхтаванага спецыяльна для студэнтаў курса "Data Engineer". З першай часткай можна азнаёміцца тут.

Apache Beam і DataFlow для канвеераў рэальнага часу

Ствараем канвеер струменевай апрацоўкі дадзеных. Частка 2

Настройка Google Cloud

Заўвага: Для запуску канвеера і публікацыі дадзеных карыстацкага лога я выкарыстоўваў Google Cloud Shell, паколькі ў мяне ўзніклі праблемы з запускам канвеера на Python 3. Google Cloud Shell выкарыстоўвае Python 2, які лепш узгадняецца з Apache Beam.

Каб запусціць канвеер, нам трэба крыху пакапацца ў наладах. Тым з вас, хто раней не карыстаўся GCP, неабходна выканаць наступныя 6 крокаў, прыведзеных на гэтай старонцы.

Пасля гэтага нам трэба будзе загрузіць нашы скрыпты ў хмарнае сховішча Google і скапіяваць іх у нашу Google Cloud Shel. Загрузка ў хмарнае сховішча досыць трывіяльная (апісанне можна знайсці тут). Каб скапіяваць нашы файлы, мы можам адкрыць Google Cloud Shel з панэлі інструментаў, пстрыкнуўшы першы значок злева на малюнку 2 ніжэй.

Ствараем канвеер струменевай апрацоўкі дадзеных. Частка 2
Малюнак 2

Каманды, якія нам патрэбны для капіявання файлаў і ўстаноўкі неабходных бібліятэк, пералічаны ніжэй.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Стварэнне нашай базы даных і табліцы

Пасля таго, як мы выканалі ўсе крокі, звязаныя з настройкай, наступнае, што нам трэба зрабіць, гэта стварыць набор дадзеных і табліцу ў BigQuery. Ёсць некалькі спосабаў зрабіць гэта, але самы просты - выкарыстоўваць кансоль Google Cloud, спачатку стварыўшы набор дадзеных. Вы можаце выканаць дзеянні, указаныя па наступнай спасылцы, Каб стварыць табліцу са схемай. Наша табліца будзе мець 7 слупкоў, Якія адпавядаюць кампанентам кожнага карыстацкага лога. Для выгоды мы вызначым усе слупкі як радкі (тып string), за выключэннем зменнай timelocal, і назавем іх у адпаведнасці са зменнымі, якія мы згенеравалі раней. Схема нашай табліцы павінна выглядаць як на рысунку 3.

Ствараем канвеер струменевай апрацоўкі дадзеных. Частка 2
Малюнак 3. Схема табліцы

Публікацыя дадзеных карыстацкага лога

Pub/Sub з'яўляецца крытычна важным кампанентам нашага канвеера, паколькі дазваляе некалькім незалежным дадаткам ўзаемадзейнічаць адзін з адным. У прыватнасці, ён працуе як пасярэднік, які дазваляе нам адпраўляць і атрымліваць паведамленні паміж праграмамі. Першае, што трэба зрабіць, гэта стварыць тэму (topic). Досыць проста перайсці ў Pub/Sub у кансолі і націснуць CREATE TOPIC.

Прыведзены ніжэй код выклікае наш скрыпт для генерацыі дадзеных лога, вызначаных вышэй, а затым падлучаецца і адпраўляе часопісы ў Pub/Sub. Адзінае, што нам трэба зрабіць, - гэта стварыць аб'ект. PublisherClient, пазначыць шлях да тэмы з дапамогай метаду topic_path і выклікаць функцыю publish с topic_path і дадзенымі. Звярніце ўвагу, што мы імпартуем generate_log_line з нашага скрыпту stream_logs, таму пераканайцеся, што гэтыя файлы знаходзяцца ў адной тэчцы, інакш вы атрымаеце памылку імпарту. Затым мы можам запусціць гэта праз нашу google-кансоль, выкарыстоўваючы:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Як толькі файл запусціцца, мы зможам назіраць выснову дадзеных лога на кансоль, як паказана на малюнку ніжэй. Гэты скрыпт будзе працаваць да таго часу, пакуль мы не выкарыстоўваем CTRL + C, Каб завяршыць яго.

Ствараем канвеер струменевай апрацоўкі дадзеных. Частка 2
Малюнак 4. Выснова publish_logs.py

Напісанне кода нашага канвеера

Цяпер, калі мы ўсё падрыхтавалі, мы можам прыступіць да самай цікавай часткі - напісання кода нашага канвеера, выкарыстоўваючы Beam і Python. Каб стварыць Beam-канвеер, нам трэба стварыць аб'ект канвеера (p). Пасля таго як мы стварылі аб'ект канвеера, мы можам прымяніць некалькі функцый адну за адной, выкарыстоўваючы аператар pipe (|). Увогуле, працоўны працэс выглядае як на малюнку ніжэй.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

У нашым кодзе мы створым дзве карыстацкія функцыі. Функцыю regex_clean, якая скануе дадзеныя і здабывае адпаведны радок на аснове спісу PATTERNS, выкарыстоўваючы функцыю re.search. Функцыя вяртае падзелены коскамі радок. Калі вы не з'яўляецеся экспертам па рэгулярных выразах, я рэкамендую азнаёміцца ​​з гэтым тутарыялам і папрактыкавацца ў блакноце, каб праверыць код. Пасля гэтага мы вызначаем карыстацкую ParDo-функцыю пад назовам расшчэплены, Якая з'яўляецца варыяцыяй Beam-пераўтварэнні для паралельнай апрацоўкі. У Python гэта робіцца адмысловым спосабам - мы павінны стварыць клас, які ўспадкоўваецца ад класа DoFn Beam. Функцыя Split прымае распаршаны радок з папярэдняй функцыі і вяртае спіс слоўнікаў з ключамі, якія адпавядаюць імёнам слупкоў у нашай табліцы BigQuery. Ёсць сёе-тое, што варта адзначыць пра гэтую функцыю: мне прыйшлося імпартаваць datetime ўнутры функцыі, каб яна працавала. Я атрымліваў паведамленне аб памылцы пры імпарце ў пачатку файла, што было дзіўна. Гэты спіс затым перадаецца ў функцыю WriteToBigQuery, Якая проста дадае нашы дадзеныя ў табліцу. Код для Batch DataFlow Job і Streaming DataFlow Job прыведзены ніжэй. Адзінае адрозненне паміж пакетным і струменевым кодам складаецца ў тым, што ў пакетнай апрацоўцы мы чытэльны CSV з src_path, выкарыстоўваючы функцыю ReadFromText з Beam.

Batch DataFlow Job (апрацоўка пакетаў)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (апрацоўка патоку)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Запуск канвеера

Мы можам запусціць канвеер некалькімі рознымі спосабамі. Калі б мы захацелі, мы маглі б проста запусціць яго лакальна з тэрмінала, выдалена увайшоўшы ў GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Аднак мы збіраемся запусціць яго з дапамогай DataFlow. Мы можам зрабіць гэта з дапамогай ніжэйпрыведзенай каманды, усталяваўшы наступныя абавязковыя параметры.

  • project - ID вашага праекта GCP.
  • runner - сродак запуску канвеера, якое прааналізуе вашу праграму і сканструюе ваш канвеер. Для выканання ў воблаку вы павінны пазначыць DataflowRunner.
  • staging_location - шлях да хмарнага сховішча Cloud Dataflow для індэксавання пакетаў кода, неабходных апрацоўшчыкам, якія выконваюць працу.
  • temp_location - шлях да хмарнага сховішча Cloud Dataflow для размяшчэння часовых файлаў заданняў, створаных падчас працы канвеера.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Пакуль гэтая каманда выконваецца, мы можам перайсці на ўкладку DataFlow у google-кансолі і прагледзець наш канвеер. Клікнуўшы па канвееры, мы павінны ўбачыць нешта падобнае на малюнак 4. У мэтах адладкі можа быць вельмі карысна перайсці ў логі, а затым у Stackdriver для прагляду падрабязных логаў. Гэта дапамагло мне вырашыць праблемы з канвеерам у шэрагу выпадкаў.

Ствараем канвеер струменевай апрацоўкі дадзеных. Частка 2
Малюнак 4: Beam-канвеер

Доступ да нашых дадзеных у BigQuery

Такім чынам, у нас ужо павінен быць запушчаны канвеер з дадзенымі, якія паступаюць у нашу табліцу. Каб праверыць гэта, мы можам перайсці да BigQuery і прагледзець дадзеныя. Пасля выкарыстання каманды ніжэй вы павінны ўбачыць першыя некалькі радкоў набору даных. Цяпер, калі ў нас ёсць дадзеныя, якія захоўваюцца ў BigQuery, мы можам правесці далейшы аналіз, а таксама падзяліцца дадзенымі з калегамі і пачаць адказваць на бізнес-пытанні.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Ствараем канвеер струменевай апрацоўкі дадзеных. Частка 2
Малюнак 5: BigQuery

Заключэнне

Спадзяемся, што гэтая пасада паслужыць карысным прыкладам стварэння струменевага канвеера дадзеных, а таксама пошуку спосабаў зрабіць дадзеныя больш даступнымі. Захоўванне дадзеных у такім фармаце дае нам шмат пераваг. Цяпер мы можам пачаць адказваць на важныя пытанні, напрыклад, колькі людзей выкарыстоўваюць наш прадукт? Ці расце з часам база карыстальнікаў? З якімі аспектамі прадукта людзі ўзаемадзейнічаюць больш за ўсё? І ці ёсць памылкі, там дзе іх быць не павінна? Гэта тыя пытанні, якія будуць цікавыя для арганізацыі. На аснове ідэй, якія вынікаюць з адказаў на гэтыя пытанні, мы зможам удасканаліць прадукт і павысіць зацікаўленасць карыстальнікаў.

Beam сапраўды карысны для такога тыпу практыкаванняў, а таксама мае шэраг іншых цікавых выпадкаў ужывання. Напрыклад, вы можаце аналізаваць дадзеныя па біржавым цікам ў рэжыме рэальнага часу і здзяйсняць здзелкі на аснове аналізу, магчыма, у вас ёсць дадзеныя датчыкаў, якія паступаюць з транспартных сродкаў, і вы хочаце вылічыць разлік ўзроўню трафіку. Вы таксама можаце, напрыклад, быць гульнявой кампаніяй, якая збірае дадзеныя аб карыстачах і выкарыстоўвалай яе для стварэння інфармацыйных панэляў для адсочвання ключавых паказчыкаў. Добра, спадары, гэта тэма ўжо для іншай пасады, дзякуй за чытанне, а для тых, хто хоча ўбачыць поўны код, ніжэй спасылка на мой GitHub.

https://github.com/DFoly/User_log_pipeline

На гэтым усё. Чытаць першую частку.

Крыніца: habr.com

Дадаць каментар