рдирдорд╕реНрддреЗред рд╣рдо рд▓реЗрдЦ рдХреЗ рдЕрдВрддрд┐рдо рднрд╛рдЧ рдХрд╛ рдЕрдиреБрд╡рд╛рдж рд╕рд╛рдЭрд╛ рдХрд░ рд░рд╣реЗ рд╣реИрдВ, рдЬреЛ рд╡рд┐рд╢реЗрд╖ рд░реВрдк рд╕реЗ рдкрд╛рдареНрдпрдХреНрд░рдо рдХреЗ рдЫрд╛рддреНрд░реЛрдВ рдХреЗ рд▓рд┐рдП рддреИрдпрд╛рд░ рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИред
рд░реАрдпрд▓-рдЯрд╛рдЗрдо рдкрд╛рдЗрдкрд▓рд╛рдЗрдиреЛрдВ рдХреЗ рд▓рд┐рдП рдЕрдкрд╛рдЪреЗ рдмреАрдо рдФрд░ рдбреЗрдЯрд╛рдлрд╝реНрд▓реЛ
Google рдХреНрд▓рд╛рдЙрдб рдХреА рд╕реНрдерд╛рдкрдирд╛
рдиреЛрдЯ: рдореИрдВрдиреЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд╛рдиреЗ рдФрд░ рдХрд╕реНрдЯрдо рд▓реЙрдЧ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд╢рд┐рдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ рдХреНрдпреЛрдВрдХрд┐ рдореБрдЭреЗ рдкрд╛рдпрдерди 3 рдореЗрдВ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд╛рдиреЗ рдореЗрдВ рд╕рдорд╕реНрдпрд╛ рд╣реЛ рд░рд╣реА рдереАред Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓ рдкрд╛рдпрдерди 2 рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддрд╛ рд╣реИ, рдЬреЛ рдЕрдкрд╛рдЪреЗ рдмреАрдо рдХреЗ рд╕рд╛рде рдЕрдзрд┐рдХ рд╕реБрд╕рдВрдЧрдд рд╣реИред
рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╢реБрд░реВ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рд╣рдореЗрдВ рд╕реЗрдЯрд┐рдВрдЧреНрд╕ рдореЗрдВ рдереЛрдбрд╝рд╛ рдЧрд╣рд░рд╛рдИ рд╕реЗ рдЬрд╛рдиреЗ рдХреА рдЬрд░реВрд░рдд рд╣реИред рдЖрдк рдореЗрдВ рд╕реЗ рдЙрди рд▓реЛрдЧреЛрдВ рдХреЗ рд▓рд┐рдП рдЬрд┐рдиреНрд╣реЛрдВрдиреЗ рдкрд╣рд▓реЗ GCP рдХрд╛ рдЙрдкрдпреЛрдЧ рдирд╣реАрдВ рдХрд┐рдпрд╛ рд╣реИ, рдЖрдкрдХреЛ рдЗрд╕рдореЗрдВ рдЙрд▓реНрд▓рд┐рдЦрд┐рдд рдирд┐рдореНрдирд▓рд┐рдЦрд┐рдд 6 рдЪрд░рдгреЛрдВ рдХрд╛ рдкрд╛рд▓рди рдХрд░рдирд╛ рд╣реЛрдЧрд╛
рдЗрд╕рдХреЗ рдмрд╛рдж, рд╣рдореЗрдВ рдЕрдкрдиреА рд╕реНрдХреНрд░рд┐рдкреНрдЯреНрд╕ рдХреЛ Google рдХреНрд▓рд╛рдЙрдб рд╕реНрдЯреЛрд░реЗрдЬ рдкрд░ рдЕрдкрд▓реЛрдб рдХрд░рдирд╛ рд╣реЛрдЧрд╛ рдФрд░ рдЙрдиреНрд╣реЗрдВ рдЕрдкрдиреЗ Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓ рдореЗрдВ рдХреЙрдкреА рдХрд░рдирд╛ рд╣реЛрдЧрд╛ред рдХреНрд▓рд╛рдЙрдб рд╕реНрдЯреЛрд░реЗрдЬ рдкрд░ рдЕрдкрд▓реЛрдб рдХрд░рдирд╛ рдХрд╛рдлреА рд╕рд╛рдорд╛рдиреНрдп рд╣реИ (рд╡рд┐рд╡рд░рдг рдкрд╛рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ)ред
рдЪрд┐рддреНрд░рд╛ 2
рдлрд╝рд╛рдЗрд▓реЛрдВ рдХреА рдкреНрд░рддрд┐рд▓рд┐рдкрд┐ рдмрдирд╛рдиреЗ рдФрд░ рдЖрд╡рд╢реНрдпрдХ рд▓рд╛рдЗрдмреНрд░реЗрд░реА рд╕реНрдерд╛рдкрд┐рдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╣рдореЗрдВ рдЬрд┐рди рдЖрджреЗрд╢реЛрдВ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ, рд╡реЗ рдиреАрдЪреЗ рд╕реВрдЪреАрдмрджреНрдз рд╣реИрдВред
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
рд╣рдорд╛рд░рд╛ рдбреЗрдЯрд╛рдмреЗрд╕ рдФрд░ рддрд╛рд▓рд┐рдХрд╛ рдмрдирд╛рдирд╛
рдПрдХ рдмрд╛рд░ рдЬрдм рд╣рдо рд╕реЗрдЯрдЕрдк рд╕рдВрдмрдВрдзреА рд╕рднреА рдЪрд░рдг рдкреВрд░реЗ рдХрд░ рд▓реЗрддреЗ рд╣реИрдВ, рддреЛ рдЕрдЧрд▓реА рдЪреАрдЬрд╝ рдЬреЛ рд╣рдореЗрдВ рдХрд░рдиреЗ рдХреА рдЬрд╝рд░реВрд░рдд рд╣реЛрддреА рд╣реИ рд╡рд╣ рд╣реИ BigQuery рдореЗрдВ рдПрдХ рдбреЗрдЯрд╛рд╕реЗрдЯ рдФрд░ рддрд╛рд▓рд┐рдХрд╛ рдмрдирд╛рдирд╛ред рдРрд╕рд╛ рдХрд░рдиреЗ рдХреЗ рдХрдИ рддрд░реАрдХреЗ рд╣реИрдВ, рд▓реЗрдХрд┐рди рд╕рдмрд╕реЗ рдЖрд╕рд╛рди рд╣реИ рдкрд╣рд▓реЗ рдбреЗрдЯрд╛рд╕реЗрдЯ рдмрдирд╛рдХрд░ Google рдХреНрд▓рд╛рдЙрдб рдХрдВрд╕реЛрд▓ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдирд╛ред рдЖрдк рдиреАрдЪреЗ рджрд┐рдП рдЧрдП рдЪрд░рдгреЛрдВ рдХрд╛ рдкрд╛рд▓рди рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ
рдЪрд┐рддреНрд░ 3. рдЯреЗрдмрд▓ рд▓реЗрдЖрдЙрдЯ
рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рд▓реЙрдЧ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд╢рд┐рдд рдХрд░рдирд╛
рдкрдм/рд╕рдм рд╣рдорд╛рд░реА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХрд╛ рдПрдХ рдорд╣рддреНрд╡рдкреВрд░реНрдг рдШрдЯрдХ рд╣реИ рдХреНрдпреЛрдВрдХрд┐ рдпрд╣ рдХрдИ рд╕реНрд╡рддрдВрддреНрд░ рдЕрдиреБрдкреНрд░рдпреЛрдЧреЛрдВ рдХреЛ рдПрдХ рджреВрд╕рд░реЗ рдХреЗ рд╕рд╛рде рд╕рдВрдЪрд╛рд░ рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИред рд╡рд┐рд╢реЗрд╖ рд░реВрдк рд╕реЗ, рдпрд╣ рдПрдХ рдордзреНрдпрд╕реНрде рдХреЗ рд░реВрдк рдореЗрдВ рдХрд╛рдо рдХрд░рддрд╛ рд╣реИ рдЬреЛ рд╣рдореЗрдВ рдЕрдиреБрдкреНрд░рдпреЛрдЧреЛрдВ рдХреЗ рдмреАрдЪ рд╕рдВрджреЗрд╢ рднреЗрдЬрдиреЗ рдФрд░ рдкреНрд░рд╛рдкреНрдд рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИред рдкрд╣рд▓реА рдЪреАрдЬрд╝ рдЬреЛ рд╣рдореЗрдВ рдХрд░рдиреЗ рдХреА рдЬрд╝рд░реВрд░рдд рд╣реИ рд╡рд╣ рд╣реИ рдПрдХ рд╡рд┐рд╖рдп рдмрдирд╛рдирд╛ред рдмрд╕ рдХрдВрд╕реЛрд▓ рдореЗрдВ рдкрдм/рд╕рдм рдкрд░ рдЬрд╛рдПрдВ рдФрд░ рдХреНрд░рд┐рдПрдЯ рдЯреЙрдкрд┐рдХ рдкрд░ рдХреНрд▓рд┐рдХ рдХрд░реЗрдВред
рдиреАрдЪреЗ рджрд┐рдпрд╛ рдЧрдпрд╛ рдХреЛрдб рдКрдкрд░ рдкрд░рд┐рднрд╛рд╖рд┐рдд рд▓реЙрдЧ рдбреЗрдЯрд╛ рдЙрддреНрдкрдиреНрди рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╣рдорд╛рд░реА рд╕реНрдХреНрд░рд┐рдкреНрдЯ рдХреЛ рдХреЙрд▓ рдХрд░рддрд╛ рд╣реИ рдФрд░ рдлрд┐рд░ рдХрдиреЗрдХреНрдЯ рдХрд░рддрд╛ рд╣реИ рдФрд░ рд▓реЙрдЧ рдХреЛ рдкрдм/рд╕рдм рдкрд░ рднреЗрдЬрддрд╛ рд╣реИред рдХреЗрд╡рд▓ рдПрдХ рдЪреАрдЬ рдЬреЛ рд╣рдореЗрдВ рдХрд░рдиреЗ рдХреА рдЬрд╝рд░реВрд░рдд рд╣реИ рд╡рд╣ рд╣реИ рдПрдХ рдСрдмреНрдЬреЗрдХреНрдЯ рдмрдирд╛рдирд╛ рдкреНрд░рдХрд╛рд╢рдХрдЧреНрд░рд╛рд╣рдХ, рд╡рд┐рдзрд┐ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рд╡рд┐рд╖рдп рдХрд╛ рдкрде рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░реЗрдВ topic_path
рдФрд░ рдлрд╝рдВрдХреНрд╢рди рдХреЛ рдХреЙрд▓ рдХрд░реЗрдВ publish
╤Б topic_path
рдФрд░ рдбреЗрдЯрд╛. рдХреГрдкрдпрд╛ рдзреНрдпрд╛рди рджреЗрдВ рдХрд┐ рд╣рдо рдЖрдпрд╛рдд рдХрд░рддреЗ рд╣реИрдВ generate_log_line
рд╣рдорд╛рд░реА рд╕реНрдХреНрд░рд┐рдкреНрдЯ рд╕реЗ stream_logs
, рдЗрд╕рд▓рд┐рдП рд╕реБрдирд┐рд╢реНрдЪрд┐рдд рдХрд░реЗрдВ рдХрд┐ рдпреЗ рдлрд╝рд╛рдЗрд▓реЗрдВ рдПрдХ рд╣реА рдлрд╝реЛрд▓реНрдбрд░ рдореЗрдВ рд╣реИрдВ, рдЕрдиреНрдпрдерд╛ рдЖрдкрдХреЛ рдПрдХ рдЖрдпрд╛рдд рддреНрд░реБрдЯрд┐ рдорд┐рд▓реЗрдЧреАред рдлрд┐рд░ рд╣рдо рдЗрд╕реЗ рдЕрдкрдиреЗ Google рдХрдВрд╕реЛрд▓ рдХреЗ рдорд╛рдзреНрдпрдо рд╕реЗ рдЪрд▓рд╛ рд╕рдХрддреЗ рд╣реИрдВ:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
рдЬреИрд╕реЗ рд╣реА рдлрд╝рд╛рдЗрд▓ рдЪрд▓рддреА рд╣реИ, рд╣рдо рдХрдВрд╕реЛрд▓ рдкрд░ рд▓реЙрдЧ рдбреЗрдЯрд╛ рдХрд╛ рдЖрдЙрдЯрдкреБрдЯ рджреЗрдЦ рдкрд╛рдПрдВрдЧреЗ, рдЬреИрд╕рд╛ рдХрд┐ рдиреАрдЪреЗ рджрд┐рдП рдЧрдП рдЪрд┐рддреНрд░ рдореЗрдВ рджрд┐рдЦрд╛рдпрд╛ рдЧрдпрд╛ рд╣реИред рдпрд╣ рд╕реНрдХреНрд░рд┐рдкреНрдЯ рддрдм рддрдХ рдХрд╛рдо рдХрд░реЗрдЧреА рдЬрдм рддрдХ рд╣рдо рдЗрд╕рдХрд╛ рдЙрдкрдпреЛрдЧ рдирд╣реАрдВ рдХрд░реЗрдВрдЧреЗ CTRL + CрдЗрд╕реЗ рдкреВрд░рд╛ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдПред
рдЪрд┐рддреНрд░ 4. рдЖрдЙрдЯрдкреБрдЯ publish_logs.py
рд╣рдорд╛рд░рд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХреЛрдб рд▓рд┐рдЦрдирд╛
рдЕрдм рдЬрдм рд╣рдордиреЗ рд╕рдм рдХреБрдЫ рддреИрдпрд╛рд░ рдХрд░ рд▓рд┐рдпрд╛ рд╣реИ, рддреЛ рд╣рдо рдордЬрд╝реЗрджрд╛рд░ рд╣рд┐рд╕реНрд╕рд╛ рд╢реБрд░реВ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ - рдмреАрдо рдФрд░ рдкрд╛рдпрдерди рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдЕрдкрдиреА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХреЛ рдХреЛрдб рдХрд░рдирд╛ред рдмреАрдо рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдмрдирд╛рдиреЗ рдХреЗ рд▓рд┐рдП, рд╣рдореЗрдВ рдПрдХ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдСрдмреНрдЬреЗрдХреНрдЯ (рдкреА) рдмрдирд╛рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред рдПрдХ рдмрд╛рд░ рдЬрдм рд╣рдо рдПрдХ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдСрдмреНрдЬреЗрдХреНрдЯ рдмрдирд╛ рд▓реЗрддреЗ рд╣реИрдВ, рддреЛ рд╣рдо рдСрдкрд░реЗрдЯрд░ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдПрдХ рдХреЗ рдмрд╛рдж рдПрдХ рдХрдИ рдлрд╝рдВрдХреНрд╢рди рд▓рд╛рдЧреВ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ pipe (|)
. рд╕рд╛рдорд╛рдиреНрдп рддреМрд░ рдкрд░, рд╡рд░реНрдХрдлрд╝реНрд▓реЛ рдиреАрдЪреЗ рджреА рдЧрдИ рдЫрд╡рд┐ рдЬреИрд╕рд╛ рджрд┐рдЦрддрд╛ рд╣реИред
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
рд╣рдорд╛рд░реЗ рдХреЛрдб рдореЗрдВ, рд╣рдо рджреЛ рдХрд╕реНрдЯрдо рдлрд╝рдВрдХреНрд╢рди рдмрдирд╛рдПрдВрдЧреЗред рд╕рдорд╛рд░реЛрд╣ regex_clean
, рдЬреЛ рдбреЗрдЯрд╛ рдХреЛ рд╕реНрдХреИрди рдХрд░рддрд╛ рд╣реИ рдФрд░ рдлрд╝рдВрдХреНрд╢рди рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдкреИрдЯрд░реНрди рд╕реВрдЪреА рдХреЗ рдЖрдзрд╛рд░ рдкрд░ рд╕рдВрдмрдВрдзрд┐рдд рдкрдВрдХреНрддрд┐ рдХреЛ рдкреБрдирд░реНрдкреНрд░рд╛рдкреНрдд рдХрд░рддрд╛ рд╣реИ re.search
. рдлрд╝рдВрдХреНрд╢рди рдЕрд▓реНрдкрд╡рд┐рд░рд╛рдо рд╕реЗ рдЕрд▓рдЧ рдХреА рдЧрдИ рд╕реНрдЯреНрд░рд┐рдВрдЧ рд▓реМрдЯрд╛рддрд╛ рд╣реИред рдпрджрд┐ рдЖрдк рд░реЗрдЧреБрд▓рд░ рдПрдХреНрд╕рдкреНрд░реЗрд╢рди рд╡рд┐рд╢реЗрд╖рдЬреНрдЮ рдирд╣реАрдВ рд╣реИрдВ, рддреЛ рдореИрдВ рдЗрд╕реЗ рдЬрд╛рдВрдЪрдиреЗ рдХреА рд╕рд▓рд╛рд╣ рджреЗрддрд╛ рд╣реВрдВ datetime
рдХрд┐рд╕реА рдлрд╝рдВрдХреНрд╢рди рдХреЗ рдЕрдВрджрд░ рдЗрд╕реЗ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рд┐рдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдПред рдореБрдЭреЗ рдлрд╝рд╛рдЗрд▓ рдХреА рд╢реБрд░реБрдЖрдд рдореЗрдВ рдПрдХ рдЖрдпрд╛рдд рддреНрд░реБрдЯрд┐ рдорд┐рд▓ рд░рд╣реА рдереА, рдЬреЛ рдЕрдЬреАрдм рдереАред рдлрд┐рд░ рдпрд╣ рд╕реВрдЪреА рдлрд╝рдВрдХреНрд╢рди рдХреЛ рднреЗрдЬ рджреА рдЬрд╛рддреА рд╣реИ WriteToBigQuery, рдЬреЛ рдмрд╕ рд╣рдорд╛рд░реЗ рдбреЗрдЯрд╛ рдХреЛ рддрд╛рд▓рд┐рдХрд╛ рдореЗрдВ рдЬреЛрдбрд╝рддрд╛ рд╣реИред рдмреИрдЪ рдбреЗрдЯрд╛рдлрд╝реНрд▓реЛ рдЬреЙрдм рдФрд░ рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдбреЗрдЯрд╛рдлрд╝реНрд▓реЛ рдЬреЙрдм рдХрд╛ рдХреЛрдб рдиреАрдЪреЗ рджрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИред рдмреИрдЪ рдФрд░ рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдХреЛрдб рдХреЗ рдмреАрдЪ рдПрдХрдорд╛рддреНрд░ рдЕрдВрддрд░ рдпрд╣ рд╣реИ рдХрд┐ рдмреИрдЪ рдореЗрдВ рд╣рдо рд╕реАрдПрд╕рд╡реА рдХреЛ рдкрдврд╝рддреЗ рд╣реИрдВ src_path
рдлрд╝рдВрдХреНрд╢рди рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдирд╛ ReadFromText
рдмреАрдо рд╕реЗ.
рдмреИрдЪ рдбреЗрдЯрд╛рдлрд╝реНрд▓реЛ рдЬреЙрдм (рдмреИрдЪ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдбреЗрдЯрд╛рдлрд╝реНрд▓реЛ рдЬреЙрдм (рд╕реНрдЯреНрд░реАрдо рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
рдХрдиреНрд╡реЗрдпрд░ рд╢реБрд░реВ рдХрд░рдирд╛
рд╣рдо рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХреЛ рдХрдИ рдЕрд▓рдЧ-рдЕрд▓рдЧ рддрд░реАрдХреЛрдВ рд╕реЗ рдЪрд▓рд╛ рд╕рдХрддреЗ рд╣реИрдВред рдпрджрд┐ рд╣рдо рдЪрд╛рд╣реЗрдВ, рддреЛ рд╣рдо GCP рдореЗрдВ рджреВрд░рд╕реНрде рд░реВрдк рд╕реЗ рд▓реЙрдЧ рдЗрди рдХрд░рддреЗ рд╣реБрдП рдЗрд╕реЗ рдЯрд░реНрдорд┐рдирд▓ рд╕реЗ рд╕реНрдерд╛рдиреАрдп рд░реВрдк рд╕реЗ рдЪрд▓рд╛ рд╕рдХрддреЗ рд╣реИрдВред
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
рд╣рд╛рд▓рд╛рдБрдХрд┐, рд╣рдо рдЗрд╕реЗ DataFlow рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдЪрд▓рд╛рдиреЗ рдЬрд╛ рд░рд╣реЗ рд╣реИрдВред рд╣рдо рдирд┐рдореНрдирд▓рд┐рдЦрд┐рдд рдЖрд╡рд╢реНрдпрдХ рдкреИрд░рд╛рдореАрдЯрд░ рд╕реЗрдЯ рдХрд░рдХреЗ рдиреАрдЪреЗ рджрд┐рдП рдЧрдП рдХрдорд╛рдВрдб рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдРрд╕рд╛ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред
project
- рдЖрдкрдХреЗ рдЬреАрд╕реАрдкреА рдкреНрд░реЛрдЬреЗрдХреНрдЯ рдХреА рдЖрдИрдбреАредrunner
рдПрдХ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд░рдирд░ рд╣реИ рдЬреЛ рдЖрдкрдХреЗ рдкреНрд░реЛрдЧреНрд░рд╛рдо рдХрд╛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХрд░реЗрдЧрд╛ рдФрд░ рдЖрдкрдХреА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХрд╛ рдирд┐рд░реНрдорд╛рдг рдХрд░реЗрдЧрд╛ред рдХреНрд▓рд╛рдЙрдб рдореЗрдВ рдЪрд▓рд╛рдиреЗ рдХреЗ рд▓рд┐рдП, рдЖрдкрдХреЛ рдПрдХ DataflowRunner рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рдирд╛ рд╣реЛрдЧрд╛редstaging_location
- рдХрд╛рдо рдХрд░рдиреЗ рд╡рд╛рд▓реЗ рдкреНрд░реЛрд╕реЗрд╕рд░ рдХреЗ рд▓рд┐рдП рдЖрд╡рд╢реНрдпрдХ рдХреЛрдб рдкреИрдХреЗрдЬреЛрдВ рдХреЛ рдЕрдиреБрдХреНрд░рдорд┐рдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдХреНрд▓рд╛рдЙрдб рдбреЗрдЯрд╛рдлрд╝реНрд▓реЛ рдХреНрд▓рд╛рдЙрдб рд╕реНрдЯреЛрд░реЗрдЬ рдХрд╛ рдкрдередtemp_location
- рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рдиреЗ рдХреЗ рджреМрд░рд╛рди рдмрдирд╛рдИ рдЧрдИ рдЕрд╕реНрдерд╛рдпреА рдиреМрдХрд░реА рдлрд╝рд╛рдЗрд▓реЛрдВ рдХреЛ рд╕рдВрдЧреНрд░рд╣реАрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдХреНрд▓рд╛рдЙрдб рдбреЗрдЯрд╛рдлреНрд▓реЛ рдХреНрд▓рд╛рдЙрдб рд╕реНрдЯреЛрд░реЗрдЬ рдХрд╛ рдкрдередstreaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
рдЬрдм рдпрд╣ рдХрдорд╛рдВрдб рдЪрд▓ рд░рд╣рд╛ рд╣реЛ, рд╣рдо Google рдХрдВрд╕реЛрд▓ рдореЗрдВ рдбреЗрдЯрд╛рдлреНрд▓реЛ рдЯреИрдм рдкрд░ рдЬрд╛ рд╕рдХрддреЗ рд╣реИрдВ рдФрд░ рдЕрдкрдиреА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рджреЗрдЦ рд╕рдХрддреЗ рд╣реИрдВред рдЬрдм рд╣рдо рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдкрд░ рдХреНрд▓рд┐рдХ рдХрд░рддреЗ рд╣реИрдВ, рддреЛ рд╣рдореЗрдВ рдЪрд┐рддреНрд░ 4 рдХреЗ рд╕рдорд╛рди рдХреБрдЫ рджреЗрдЦрдирд╛ рдЪрд╛рд╣рд┐рдПред рдбрд┐рдмрдЧрд┐рдВрдЧ рдЙрджреНрджреЗрд╢реНрдпреЛрдВ рдХреЗ рд▓рд┐рдП, рд╡рд┐рд╕реНрддреГрдд рд▓реЙрдЧ рджреЗрдЦрдиреЗ рдХреЗ рд▓рд┐рдП рд▓реЙрдЧреНрд╕ рдФрд░ рдлрд┐рд░ рд╕реНрдЯреИрдХрдбреНрд░рд╛рдЗрд╡рд░ рдкрд░ рдЬрд╛рдирд╛ рдмрд╣реБрдд рдорджрджрдЧрд╛рд░ рд╣реЛ рд╕рдХрддрд╛ рд╣реИред рдЗрд╕рд╕реЗ рдореБрдЭреЗ рдХрдИ рдорд╛рдорд▓реЛрдВ рдореЗрдВ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рдВрдмрдВрдзреА рд╕рдорд╕реНрдпрд╛рдУрдВ рдХреЛ рд╣рд▓ рдХрд░рдиреЗ рдореЗрдВ рдорджрдж рдорд┐рд▓реА рд╣реИред
рдЪрд┐рддреНрд░ 4: рдмреАрдо рдХрдиреНрд╡реЗрдпрд░
BigQuery рдореЗрдВ рд╣рдорд╛рд░реЗ рдбреЗрдЯрд╛ рддрдХ рдкрд╣реБрдВрдЪреЗрдВ
рдЗрд╕рд▓рд┐рдП, рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдкрд╣рд▓реЗ рд╕реЗ рд╣реА рд╣рдорд╛рд░реА рддрд╛рд▓рд┐рдХрд╛ рдореЗрдВ рдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рд┐рдд рдХрд░рдиреЗ рд╡рд╛рд▓реА рдПрдХ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╣реЛрдиреА рдЪрд╛рд╣рд┐рдПред рдЗрд╕рдХрд╛ рдкрд░реАрдХреНрд╖рдг рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рд╣рдо BigQuery рдкрд░ рдЬрд╛ рд╕рдХрддреЗ рд╣реИрдВ рдФрд░ рдбреЗрдЯрд╛ рджреЗрдЦ рд╕рдХрддреЗ рд╣реИрдВред рдиреАрдЪреЗ рджрд┐рдП рдЧрдП рдХрдорд╛рдВрдб рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдиреЗ рдХреЗ рдмрд╛рдж рдЖрдкрдХреЛ рдбреЗрдЯрд╛рд╕реЗрдЯ рдХреА рдкрд╣рд▓реА рдХреБрдЫ рдкрдВрдХреНрддрд┐рдпрд╛рдБ рджреЗрдЦрдиреА рдЪрд╛рд╣рд┐рдПред рдЕрдм рдЬрдм рд╣рдорд╛рд░реЗ рдкрд╛рд╕ BigQuery рдореЗрдВ рд╕рдВрдЧреНрд░рд╣реАрдд рдбреЗрдЯрд╛ рд╣реИ, рддреЛ рд╣рдо рдЖрдЧреЗ рдХрд╛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ, рд╕рд╛рде рд╣реА рд╕рд╣рдХрд░реНрдорд┐рдпреЛрдВ рдХреЗ рд╕рд╛рде рдбреЗрдЯрд╛ рд╕рд╛рдЭрд╛ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ рдФрд░ рд╡реНрдпрд╛рд╡рд╕рд╛рдпрд┐рдХ рдкреНрд░рд╢реНрдиреЛрдВ рдХрд╛ рдЙрддреНрддрд░ рджреЗрдирд╛ рд╢реБрд░реВ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
рдЪрд┐рддреНрд░ 5: рдмрд┐рдЧрдХреНрд╡реЗрд░реА
рдирд┐рд╖реНрдХрд░реНрд╖
рд╣рдореЗрдВ рдЙрдореНрдореАрдж рд╣реИ рдХрд┐ рдпрд╣ рдкреЛрд╕реНрдЯ рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдбреЗрдЯрд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдмрдирд╛рдиреЗ рдХреЗ рд╕рд╛рде-рд╕рд╛рде рдбреЗрдЯрд╛ рдХреЛ рдЕрдзрд┐рдХ рд╕реБрд▓рдн рдмрдирд╛рдиреЗ рдХреЗ рддрд░реАрдХреЗ рдЦреЛрдЬрдиреЗ рдХреЗ рд▓рд┐рдП рдПрдХ рдЙрдкрдпреЛрдЧреА рдЙрджрд╛рд╣рд░рдг рдХреЗ рд░реВрдк рдореЗрдВ рдХрд╛рдо рдХрд░реЗрдЧреАред рдЗрд╕ рдлреЙрд░реНрдореЗрдЯ рдореЗрдВ рдбреЗрдЯрд╛ рд╕реНрдЯреЛрд░ рдХрд░рдиреЗ рд╕реЗ рд╣рдореЗрдВ рдХрдИ рдлрд╛рдпрджреЗ рдорд┐рд▓рддреЗ рд╣реИрдВред рдЕрдм рд╣рдо рдорд╣рддреНрд╡рдкреВрд░реНрдг рд╕рд╡рд╛рд▓реЛрдВ рдХрд╛ рдЬрд╡рд╛рдм рджреЗрдирд╛ рд╢реБрд░реВ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ рдЬреИрд╕реЗ рдХрд┐ рдХрд┐рддрдиреЗ рд▓реЛрдЧ рд╣рдорд╛рд░реЗ рдЙрддреНрдкрд╛рдж рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рддреЗ рд╣реИрдВ? рдХреНрдпрд╛ рдЖрдкрдХрд╛ рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдЖрдзрд╛рд░ рд╕рдордп рдХреЗ рд╕рд╛рде рдмрдврд╝ рд░рд╣рд╛ рд╣реИ? рд▓реЛрдЧ рдЙрддреНрдкрд╛рдж рдХреЗ рдХрд┐рди рдкрд╣рд▓реБрдУрдВ рдХреЗ рд╕рд╛рде рд╕рдмрд╕реЗ рдЕрдзрд┐рдХ рдЗрдВрдЯрд░реИрдХреНрдЯ рдХрд░рддреЗ рд╣реИрдВ? рдФрд░ рдХреНрдпрд╛ рдРрд╕реА рддреНрд░реБрдЯрд┐рдпрд╛рдВ рд╣реИрдВ рдЬрд╣рд╛рдВ рдирд╣реАрдВ рд╣реЛрдиреА рдЪрд╛рд╣рд┐рдП? рдпреЗ рд╡реЗ рдкреНрд░рд╢реНрди рд╣реИрдВ рдЬреЛ рд╕рдВрдЧрдарди рдХреЗ рд▓рд┐рдП рд░реБрдЪрд┐рдХрд░ рд╣реЛрдВрдЧреЗред рдЗрди рд╕рд╡рд╛рд▓реЛрдВ рдХреЗ рдЬрд╡рд╛рдмреЛрдВ рд╕реЗ рдЙрднрд░рдиреЗ рд╡рд╛рд▓реА рдЕрдВрддрд░реНрджреГрд╖реНрдЯрд┐ рдХреЗ рдЖрдзрд╛рд░ рдкрд░, рд╣рдо рдЙрддреНрдкрд╛рдж рдореЗрдВ рд╕реБрдзрд╛рд░ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ рдФрд░ рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдЬреБрдбрд╝рд╛рд╡ рдмрдврд╝рд╛ рд╕рдХрддреЗ рд╣реИрдВред
рдЗрд╕ рдкреНрд░рдХрд╛рд░ рдХреЗ рд╡реНрдпрд╛рдпрд╛рдо рдХреЗ рд▓рд┐рдП рдмреАрдо рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ рдЙрдкрдпреЛрдЧреА рд╣реИ рдФрд░ рдЗрд╕рдХреЗ рдХрдИ рдЕрдиреНрдп рджрд┐рд▓рдЪрд╕реНрдк рдЙрдкрдпреЛрдЧ рдХреЗ рдорд╛рдорд▓реЗ рднреА рд╣реИрдВред рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдЖрдк рд╡рд╛рд╕реНрддрд╡рд┐рдХ рд╕рдордп рдореЗрдВ рд╕реНрдЯреЙрдХ рдЯрд┐рдХ рдбреЗрдЯрд╛ рдХрд╛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХрд░рдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВ рдФрд░ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХреЗ рдЖрдзрд╛рд░ рдкрд░ рд╡реНрдпрд╛рдкрд╛рд░ рдХрд░рдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВ, рд╢рд╛рдпрдж рдЖрдкрдХреЗ рдкрд╛рд╕ рд╡рд╛рд╣рдиреЛрдВ рд╕реЗ рдЖрдиреЗ рд╡рд╛рд▓рд╛ рд╕реЗрдВрд╕рд░ рдбреЗрдЯрд╛ рд╣реИ рдФрд░ рдЖрдк рдЯреНрд░реИрдлрд╝рд┐рдХ рд╕реНрддрд░ рдХреА рдЧрдгрдирд╛ рдХрд░рдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВред рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдЖрдк рдПрдХ рдЧреЗрдорд┐рдВрдЧ рдХрдВрдкрдиреА рднреА рд╣реЛ рд╕рдХрддреЗ рд╣реИрдВ рдЬреЛ рдЙрдкрдпреЛрдЧрдХрд░реНрддрд╛ рдбреЗрдЯрд╛ рдПрдХрддреНрд░ рдХрд░рддреА рд╣реИ рдФрд░ рдЗрд╕рдХрд╛ рдЙрдкрдпреЛрдЧ рдореБрдЦреНрдп рдореЗрдЯреНрд░рд┐рдХреНрд╕ рдХреЛ рдЯреНрд░реИрдХ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдбреИрд╢рдмреЛрд░реНрдб рдмрдирд╛рдиреЗ рдореЗрдВ рдХрд░рддреА рд╣реИред рдареАрдХ рд╣реИ, рд╕рдЬреНрдЬрдиреЛрдВ, рдпрд╣ рдПрдХ рдЕрдиреНрдп рдкреЛрд╕реНрдЯ рдХреЗ рд▓рд┐рдП рдПрдХ рд╡рд┐рд╖рдп рд╣реИ, рдкрдврд╝рдиреЗ рдХреЗ рд▓рд┐рдП рдзрдиреНрдпрд╡рд╛рдж, рдФрд░ рдЬреЛ рд▓реЛрдЧ рдкреВрд░рд╛ рдХреЛрдб рджреЗрдЦрдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВ, рдЙрдирдХреЗ рд▓рд┐рдП рдиреАрдЪреЗ рдореЗрд░реЗ GitHub рдХрд╛ рд▓рд┐рдВрдХ рд╣реИред
рдмрд╕ рдЗрддрдирд╛ рд╣реАред
рд╕реНрд░реЛрдд: www.habr.com