Салом ба ҳама. Мо тарҷумаи қисми ниҳоии мақоларо, ки махсус барои донишҷӯёни курс таҳия шудааст, мубодила мекунем.
Apache Beam ва DataFlow барои лӯлаҳои вақти воқеӣ
Насб кардани Google Cloud
Эзоҳ: Ман Google Cloud Shell-ро барои иҷро кардани лӯла ва интишори маълумоти сабти фармоишӣ истифода мебурдам, зеро дар идоракунии қубур дар Python 3 мушкилӣ доштам. Google Cloud Shell Python 2-ро истифода мебарад, ки бо Apache Beam мувофиқтар аст.
Барои оғоз кардани қубур, мо бояд каме ба танзимот кобед. Барои онҳое, ки қаблан GCP-ро истифода накардаанд, ба шумо лозим меояд, ки 6 қадами дар ин бора зикршударо иҷро кунед
Пас аз ин, мо бояд скриптҳои худро ба Google Cloud Storage бор кунем ва онҳоро ба Google Cloud Shel нусхабардорӣ кунем. Боргузорӣ ба анбори абр хеле ночиз аст (тавсифро метавон пайдо кард
Расми 2
Фармонҳое, ки мо бояд файлҳоро нусхабардорӣ кунем ва китобхонаҳои лозимиро насб кунем, дар зер оварда шудаанд.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Эҷоди пойгоҳи додаҳо ва ҷадвали мо
Пас аз он ки мо ҳамаи қадамҳои марбут ба танзимро анҷом додем, чизи дигаре, ки мо бояд анҷом диҳем, дар BigQuery маҷмӯи додаҳо ва ҷадвал эҷод кардан аст. Якчанд роҳҳо барои ин вуҷуд доранд, аммо соддатаринаш истифодаи консоли Google Cloud тавассути эҷоди маҷмӯи додаҳо мебошад. Шумо метавонед қадамҳои зеринро иҷро кунед
Расми 3. Тартиби ҷадвал
Интишори маълумоти сабти корбар
Pub/Sub ҷузъи муҳими лӯлаи мост, зеро он имкон медиҳад, ки барномаҳои сершумори мустақил бо ҳамдигар муошират кунанд. Аз ҷумла, он ҳамчун миёнарав кор мекунад, ки ба мо имкон медиҳад, ки паёмҳоро байни барномаҳо ирсол ва қабул кунем. Аввалин чизе, ки мо бояд анҷом диҳем, эҷод кардани мавзӯъ аст. Танҳо дар консол ба Pub/Sub равед ва Эҷоди МАВЗУро пахш кунед.
Рамзи дар поён овардашуда скрипти моро даъват мекунад, то маълумоти сабти дар боло муайяншударо тавлид кунад ва пас гузоришҳоро ба Pub/Sub пайваст ва мефиристад. Ягона чизе, ки мо бояд кунем, сохтани объект аст PublisherClient, бо истифода аз усул роҳи мавзӯъро муайян кунед topic_path
ва функсияро даъват кунед publish
с topic_path
ва маълумот. Лутфан қайд кунед, ки мо ворид мекунем generate_log_line
аз скрипти мо stream_logs
, бинобар ин боварӣ ҳосил кунед, ки ин файлҳо дар як ҷузвдон ҳастанд, вагарна шумо хатои воридотро мегиред. Пас мо метавонем инро тавассути консоли google-и худ бо истифода аз:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Ҳамин ки файл кор мекунад, мо метавонем баромади маълумоти сабтро ба консол бубинем, тавре ки дар расми зер нишон дода шудааст. Ин скрипт то он даме, ки мо истифода набарем, кор хоҳад кард CTRL + Cба анчом расондани он.
Расми 4. Натиҷа publish_logs.py
Навиштани рамзи қубури мо
Акнун, ки мо ҳама чизро омода кардаем, мо метавонем қисми шавқоварро оғоз кунем - рамзгузории лӯлаи худро бо истифода аз Beam ва Python. Барои сохтани лӯлаи Beam, мо бояд объекти қубурро созем (р). Пас аз он ки мо объекти қубурро эҷод кардем, мо метавонем бо истифода аз оператор якчанд вазифаҳоро пай дар пай татбиқ кунем pipe (|)
. Умуман, ҷараёни кор ба тасвири дар поён овардашуда монанд аст.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Дар рамзи мо, мо ду функсияи фармоиширо эҷод мекунем. Функсия regex_clean
, ки маълумотро скан мекунад ва сатри мувофиқро дар асоси рӯйхати PATTERNS бо истифода аз функсия бармегардонад re.search
. Функсия сатри аз вергул ҷудошударо бармегардонад. Агар шумо коршиноси ифодаи муқаррарӣ набошед, ман тавсия медиҳам, ки инро тафтиш кунед datetime
дар дохили функсия барои кор кардани он. Ман дар оғози файл хатои воридотӣ гирифтам, ки аҷиб буд. Пас аз ин рӯйхат ба функсия интиқол дода мешавад WriteToBigQuery, ки танҳо маълумоти моро ба ҷадвал илова мекунад. Рамзи Batch DataFlow Job ва Streaming DataFlow Job дар зер оварда шудааст. Ягона фарқияти байни коди партия ва ҷараён дар он аст, ки дар партия мо CSV-ро аз он мехонем src_path
бо истифода аз функсия ReadFromText
аз Beam.
Batch DataFlow Job (коркарди партия)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Ҷараёни DataFlow Job (коркарди ҷараён)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Ба кор андохтани конвейер
Мо метавонем қубурро бо якчанд роҳҳои гуногун гузаронем. Агар мо мехостем, мо метавонем онро танҳо аз терминал ҳангоми ворид шудан ба GCP дурдаст ба таври маҳаллӣ иҷро кунем.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Аммо, мо онро бо истифода аз DataFlow иҷро мекунем. Мо метавонем ин корро бо истифода аз фармони зер тавассути танзими параметрҳои зарурии зерин иҷро кунем.
project
— ID-и лоиҳаи GCP-и шумо.runner
як давандаи қубур аст, ки барномаи шуморо таҳлил мекунад ва лӯлаи шуморо месозад. Барои дар абр кор кардан, шумо бояд DataflowRunner-ро муайян кунед.staging_location
— роҳ ба нигоҳдории абрии Cloud Dataflow барои индексатсияи бастаҳои кодҳо, ки ба коркардкунандагони ин кор лозиманд.temp_location
— роҳ ба анбори абрии Cloud Dataflow барои нигоҳ доштани файлҳои кори муваққатӣ, ки ҳангоми кор кардани қубур сохта шудаанд.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Ҳангоме ки ин фармон иҷро мешавад, мо метавонем ба ҷадвали DataFlow дар консоли google равем ва лӯлаи худро бубинем. Вақте ки мо қубурро пахш мекунем, мо бояд чизе монандро ба расми 4 бубинем. Бо мақсади ислоҳ, рафтан ба Гузоришҳо ва сипас ба Stackdriver барои дидани гузоришҳои муфассал хеле муфид аст. Ин ба ман кӯмак кард, ки дар як қатор ҳолатҳо мушкилоти қубурро ҳал кунам.
Расми 4: конвейери нур
Ба маълумоти мо дар BigQuery дастрасӣ пайдо кунед
Ҳамин тавр, мо бояд аллакай лӯлае дошта бошем, ки маълумот ба ҷадвали мо ҷорӣ мешавад. Барои санҷидани ин, мо метавонем ба BigQuery рафта, маълумотро бубинем. Пас аз истифодаи фармони дар поён овардашуда шумо бояд чанд сатри аввали маҷмӯи маълумотро бинед. Акнун, ки мо маълумоти дар BigQuery захирашударо дорем, мо метавонем таҳлили минбаъда гузаронем, инчунин маълумотро бо ҳамкорон мубодила кунем ва ба саволҳои тиҷоратӣ ҷавоб диҳем.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Расми 5: BigQuery
хулоса
Мо умедворем, ки ин паём ҳамчун як намунаи муфид барои эҷоди лӯлаи интиқоли маълумот ва инчунин дарёфти роҳҳои дастрас кардани маълумот хидмат мекунад. Нигоҳ доштани маълумот дар ин формат ба мо бартариҳои зиёд медиҳад. Акнун мо метавонем ба саволҳои муҳим ҷавоб диҳем, масалан чанд нафар аз маҳсулоти моро истифода мебаранд? Оё пойгоҳи корбарони шумо бо мурури замон афзоиш меёбад? Одамон бо кадом ҷанбаҳои маҳсулот бештар муошират мекунанд? Ва оё хатоҳое ҳастанд, ки дар он ҷо набояд вуҷуд дошта бошад? Инҳо саволҳое ҳастанд, ки барои созмон таваҷҷӯҳ хоҳанд кард. Дар асоси фаҳмишҳое, ки аз ҷавобҳо ба ин саволҳо бармеоянд, мо метавонем маҳсулотро такмил диҳем ва ҷалби корбаронро зиёд кунем.
Beam барои ин намуди машқ воқеан муфид аст ва як қатор ҳолатҳои дигари ҷолиби истифода низ дорад. Масалан, шумо метавонед маълумотро дар вақти воқеӣ таҳлил кунед ва дар асоси таҳлил савдо кунед, шояд шумо маълумоти сенсорӣ аз мошинҳо дошта бошед ва мехоҳед ҳисобҳои сатҳи трафикро ҳисоб кунед. Шумо инчунин метавонед, масалан, як ширкати бозӣ бошед, ки маълумоти корбарро ҷамъоварӣ мекунад ва онро барои эҷоди панелҳои идоракунӣ барои пайгирии ченакҳои калидӣ истифода мебарад. Хуб, ҷанобон, ин мавзӯъ барои мақолаи дигар аст, ташаккур барои хондан ва барои онҳое, ки мехоҳанд рамзи пурраро бубинанд, дар зер истиноди GitHub-и ман аст.
Ҳамааш ҳамин.
Манбаъ: will.com