Ўсім прывітанне. Дзелімся перакладам заключнай часткі артыкула, падрыхтаванага спецыяльна для студэнтаў курса
Apache Beam і DataFlow для канвеераў рэальнага часу
Настройка Google Cloud
Заўвага: Для запуску канвеера і публікацыі дадзеных карыстацкага лога я выкарыстоўваў Google Cloud Shell, паколькі ў мяне ўзніклі праблемы з запускам канвеера на Python 3. Google Cloud Shell выкарыстоўвае Python 2, які лепш узгадняецца з Apache Beam.
Каб запусціць канвеер, нам трэба крыху пакапацца ў наладах. Тым з вас, хто раней не карыстаўся GCP, неабходна выканаць наступныя 6 крокаў, прыведзеных на гэтай
Пасля гэтага нам трэба будзе загрузіць нашы скрыпты ў хмарнае сховішча Google і скапіяваць іх у нашу Google Cloud Shel. Загрузка ў хмарнае сховішча досыць трывіяльная (апісанне можна знайсці
Малюнак 2
Каманды, якія нам патрэбны для капіявання файлаў і ўстаноўкі неабходных бібліятэк, пералічаны ніжэй.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Стварэнне нашай базы даных і табліцы
Пасля таго, як мы выканалі ўсе крокі, звязаныя з настройкай, наступнае, што нам трэба зрабіць, гэта стварыць набор дадзеных і табліцу ў BigQuery. Ёсць некалькі спосабаў зрабіць гэта, але самы просты - выкарыстоўваць кансоль Google Cloud, спачатку стварыўшы набор дадзеных. Вы можаце выканаць дзеянні, указаныя па наступнай
Малюнак 3. Схема табліцы
Публікацыя дадзеных карыстацкага лога
Pub/Sub з'яўляецца крытычна важным кампанентам нашага канвеера, паколькі дазваляе некалькім незалежным дадаткам ўзаемадзейнічаць адзін з адным. У прыватнасці, ён працуе як пасярэднік, які дазваляе нам адпраўляць і атрымліваць паведамленні паміж праграмамі. Першае, што трэба зрабіць, гэта стварыць тэму (topic). Досыць проста перайсці ў Pub/Sub у кансолі і націснуць CREATE TOPIC.
Прыведзены ніжэй код выклікае наш скрыпт для генерацыі дадзеных лога, вызначаных вышэй, а затым падлучаецца і адпраўляе часопісы ў Pub/Sub. Адзінае, што нам трэба зрабіць, - гэта стварыць аб'ект. PublisherClient, пазначыць шлях да тэмы з дапамогай метаду topic_path
і выклікаць функцыю publish
с topic_path
і дадзенымі. Звярніце ўвагу, што мы імпартуем generate_log_line
з нашага скрыпту stream_logs
, таму пераканайцеся, што гэтыя файлы знаходзяцца ў адной тэчцы, інакш вы атрымаеце памылку імпарту. Затым мы можам запусціць гэта праз нашу google-кансоль, выкарыстоўваючы:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Як толькі файл запусціцца, мы зможам назіраць выснову дадзеных лога на кансоль, як паказана на малюнку ніжэй. Гэты скрыпт будзе працаваць да таго часу, пакуль мы не выкарыстоўваем CTRL + C, Каб завяршыць яго.
Малюнак 4. Выснова publish_logs.py
Напісанне кода нашага канвеера
Цяпер, калі мы ўсё падрыхтавалі, мы можам прыступіць да самай цікавай часткі - напісання кода нашага канвеера, выкарыстоўваючы Beam і Python. Каб стварыць Beam-канвеер, нам трэба стварыць аб'ект канвеера (p). Пасля таго як мы стварылі аб'ект канвеера, мы можам прымяніць некалькі функцый адну за адной, выкарыстоўваючы аператар pipe (|)
. Увогуле, працоўны працэс выглядае як на малюнку ніжэй.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
У нашым кодзе мы створым дзве карыстацкія функцыі. Функцыю regex_clean
, якая скануе дадзеныя і здабывае адпаведны радок на аснове спісу PATTERNS, выкарыстоўваючы функцыю re.search
. Функцыя вяртае падзелены коскамі радок. Калі вы не з'яўляецеся экспертам па рэгулярных выразах, я рэкамендую азнаёміцца з гэтым datetime
ўнутры функцыі, каб яна працавала. Я атрымліваў паведамленне аб памылцы пры імпарце ў пачатку файла, што было дзіўна. Гэты спіс затым перадаецца ў функцыю WriteToBigQuery, Якая проста дадае нашы дадзеныя ў табліцу. Код для Batch DataFlow Job і Streaming DataFlow Job прыведзены ніжэй. Адзінае адрозненне паміж пакетным і струменевым кодам складаецца ў тым, што ў пакетнай апрацоўцы мы чытэльны CSV з src_path
, выкарыстоўваючы функцыю ReadFromText
з Beam.
Batch DataFlow Job (апрацоўка пакетаў)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (апрацоўка патоку)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Запуск канвеера
Мы можам запусціць канвеер некалькімі рознымі спосабамі. Калі б мы захацелі, мы маглі б проста запусціць яго лакальна з тэрмінала, выдалена увайшоўшы ў GCP.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Аднак мы збіраемся запусціць яго з дапамогай DataFlow. Мы можам зрабіць гэта з дапамогай ніжэйпрыведзенай каманды, усталяваўшы наступныя абавязковыя параметры.
project
- ID вашага праекта GCP.runner
- сродак запуску канвеера, якое прааналізуе вашу праграму і сканструюе ваш канвеер. Для выканання ў воблаку вы павінны пазначыць DataflowRunner.staging_location
- шлях да хмарнага сховішча Cloud Dataflow для індэксавання пакетаў кода, неабходных апрацоўшчыкам, якія выконваюць працу.temp_location
- шлях да хмарнага сховішча Cloud Dataflow для размяшчэння часовых файлаў заданняў, створаных падчас працы канвеера.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Пакуль гэтая каманда выконваецца, мы можам перайсці на ўкладку DataFlow у google-кансолі і прагледзець наш канвеер. Клікнуўшы па канвееры, мы павінны ўбачыць нешта падобнае на малюнак 4. У мэтах адладкі можа быць вельмі карысна перайсці ў логі, а затым у Stackdriver для прагляду падрабязных логаў. Гэта дапамагло мне вырашыць праблемы з канвеерам у шэрагу выпадкаў.
Малюнак 4: Beam-канвеер
Доступ да нашых дадзеных у BigQuery
Такім чынам, у нас ужо павінен быць запушчаны канвеер з дадзенымі, якія паступаюць у нашу табліцу. Каб праверыць гэта, мы можам перайсці да BigQuery і прагледзець дадзеныя. Пасля выкарыстання каманды ніжэй вы павінны ўбачыць першыя некалькі радкоў набору даных. Цяпер, калі ў нас ёсць дадзеныя, якія захоўваюцца ў BigQuery, мы можам правесці далейшы аналіз, а таксама падзяліцца дадзенымі з калегамі і пачаць адказваць на бізнес-пытанні.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Малюнак 5: BigQuery
Заключэнне
Спадзяемся, што гэтая пасада паслужыць карысным прыкладам стварэння струменевага канвеера дадзеных, а таксама пошуку спосабаў зрабіць дадзеныя больш даступнымі. Захоўванне дадзеных у такім фармаце дае нам шмат пераваг. Цяпер мы можам пачаць адказваць на важныя пытанні, напрыклад, колькі людзей выкарыстоўваюць наш прадукт? Ці расце з часам база карыстальнікаў? З якімі аспектамі прадукта людзі ўзаемадзейнічаюць больш за ўсё? І ці ёсць памылкі, там дзе іх быць не павінна? Гэта тыя пытанні, якія будуць цікавыя для арганізацыі. На аснове ідэй, якія вынікаюць з адказаў на гэтыя пытанні, мы зможам удасканаліць прадукт і павысіць зацікаўленасць карыстальнікаў.
Beam сапраўды карысны для такога тыпу практыкаванняў, а таксама мае шэраг іншых цікавых выпадкаў ужывання. Напрыклад, вы можаце аналізаваць дадзеныя па біржавым цікам ў рэжыме рэальнага часу і здзяйсняць здзелкі на аснове аналізу, магчыма, у вас ёсць дадзеныя датчыкаў, якія паступаюць з транспартных сродкаў, і вы хочаце вылічыць разлік ўзроўню трафіку. Вы таксама можаце, напрыклад, быць гульнявой кампаніяй, якая збірае дадзеныя аб карыстачах і выкарыстоўвалай яе для стварэння інфармацыйных панэляў для адсочвання ключавых паказчыкаў. Добра, спадары, гэта тэма ўжо для іншай пасады, дзякуй за чытанне, а для тых, хто хоча ўбачыць поўны код, ніжэй спасылка на мой GitHub.
На гэтым усё.
Крыніца: habr.com