рдирдорд╕реНрддреЗ рд╕рдмреИред рд╣рд╛рдореА рд▓реЗрдЦрдХреЛ рдЕрдиреНрддрд┐рдо рднрд╛рдЧрдХреЛ рдЕрдиреБрд╡рд╛рдж рд╕рд╛рдЭрд╛ рдЧрд░реНрджреИрдЫреМрдВ, рд╡рд┐рд╢реЗрд╖ рдЧрд░реА рдкрд╛рдареНрдпрдХреНрд░рдордХрд╛ рд╡рд┐рджреНрдпрд╛рд░реНрдереАрд╣рд░реВрдХрд╛ рд▓рд╛рдЧрд┐ рддрдпрд╛рд░ рдЧрд░рд┐рдПрдХреЛред
Apache Beam рд░ DataFlow рд╡рд╛рд╕реНрддрд╡рд┐рдХ-рд╕рдордп рдкрд╛рдЗрдкрд▓рд╛рдЗрдирд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐
Google рдХреНрд▓рд╛рдЙрдб рд╕реЗрдЯрдЕрдк рдЧрд░реНрджреИ
рдиреЛрдЯ: рдореИрд▓реЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд╛рдЙрди рд░ рдЕрдиреБрдХреВрд▓рди рд▓рдЧ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд╢рд┐рдд рдЧрд░реНрди Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрдВ рдХрд┐рдирднрдиреЗ рдорд▓рд╛рдИ рдкрд╛рдЗрдерди 3 рдорд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд╛рдЙрди рд╕рдорд╕реНрдпрд╛ рднрдЗрд░рд╣реЗрдХреЛ рдерд┐рдпреЛред Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓рд▓реЗ рдкрд╛рдЗрдерди 2 рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджрдЫ, рдЬреБрди Apache Beam рд╕рдБрдЧ рдЕрдзрд┐рдХ рд╕реБрд╕рдВрдЧрдд рдЫред
рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕реБрд░реБ рдЧрд░реНрди, рд╣рд╛рдореАрд▓реЗ рд╕реЗрдЯрд┐рдЩрд╣рд░реВрдорд╛ рдереЛрд░реИ рдЦрдиреНрдиреБрдкрд░реНрдЫред рддрдкрд╛рдИрдВрд╣рд░реВ рдордзреНрдпреЗ рдЬрд╕рд▓реЗ рдкрд╣рд┐рд▓реЗ GCP рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдиреБрднрдПрдХреЛ рдЫреИрди, рддрдкрд╛рдИрдВрд▓реЗ рдпрд╕рдорд╛ рдЙрд▓реНрд▓рд┐рдЦрд┐рдд рдирд┐рдореНрди 6 рдЪрд░рдгрд╣рд░реВ рдкрд╛рд▓рдирд╛ рдЧрд░реНрдиреБрдкрд░реНрдиреЗрдЫред
рдпрд╕ рдкрдЫрд┐, рд╣рд╛рдореАрд▓реЗ рд╣рд╛рдореНрд░рд╛ рд╕реНрдХреНрд░рд┐рдкреНрдЯрд╣рд░реВ рдЧреБрдЧрд▓ рдХреНрд▓рд╛рдЙрдб рднрдгреНрдбрд╛рд░рдгрдорд╛ рдЕрдкрд▓реЛрдб рдЧрд░реНрдиреБрдкрд░реНрдиреЗрдЫ рд░ рддрд┐рдиреАрд╣рд░реВрд▓рд╛рдИ рд╣рд╛рдореНрд░реЛ рдЧреБрдЧрд▓ рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓рдорд╛ рдкреНрд░рддрд┐рд▓рд┐рдкрд┐ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫред рдХреНрд▓рд╛рдЙрдб рднрдгреНрдбрд╛рд░рдгрдорд╛ рдЕрдкрд▓реЛрдб рдЧрд░реНрдиреБ рдПрдХрджрдо рдорд╛рдореВрд▓реА рдЫ (рдПрдХ рд╡рд┐рд╡рд░рдг рдлреЗрд▓рд╛ рдкрд╛рд░реНрди рд╕рдХрд┐рдиреНрдЫ
2 рдлрд┐рдЧрд░
рд╣рд╛рдореАрд▓реЗ рдлрд╛рдЗрд▓рд╣рд░реВ рдкреНрд░рддрд┐рд▓рд┐рдкрд┐ рдЧрд░реНрди рд░ рдЖрд╡рд╢реНрдпрдХ рдкреБрд╕реНрддрдХрд╛рд▓рдпрд╣рд░реВ рд╕реНрдерд╛рдкрдирд╛ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЖрджреЗрд╢рд╣рд░реВ рддрд▓ рд╕реВрдЪреАрдмрджреНрдз рдЫрдиреНред
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
рд╣рд╛рдореНрд░реЛ рдбрд╛рдЯрд╛рдмреЗрд╕ рд░ рддрд╛рд▓рд┐рдХрд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрджреИ
рд╣рд╛рдореАрд▓реЗ рд╕реЗрдЯрдЕрдк рд╕рдореНрдмрдиреНрдзрд┐рдд рд╕рдмреИ рдЪрд░рдгрд╣рд░реВ рдкреВрд░рд╛ рдЧрд░рд┐рд╕рдХреЗрдкрдЫрд┐, рд╣рд╛рдореАрд▓реЗ рдЧрд░реНрдиреБрдкрд░реНрдиреЗ рдЕрд░реНрдХреЛ рдХреБрд░рд╛ BigQuery рдорд╛ рдбреЗрдЯрд╛рд╕реЗрдЯ рд░ рддрд╛рд▓рд┐рдХрд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБ рд╣реЛред рдпрд╕рдХрд╛ рд▓рд╛рдЧрд┐ рдзреЗрд░реИ рддрд░рд┐рдХрд╛рд╣рд░реВ рдЫрдиреН, рддрд░ рд╕рдмреИрднрдиреНрджрд╛ рд╕рд░рд▓ рднрдиреЗрдХреЛ рдкрд╣рд┐рд▓реЛ рдбреЗрдЯрд╛рд╕реЗрдЯ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реЗрд░ Google рдХреНрд▓рд╛рдЙрдб рдХрдиреНрд╕реЛрд▓ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдиреБ рд╣реЛред рддрдкрд╛рдИрдВ рддрд▓рдХрд╛ рдЪрд░рдгрд╣рд░реВ рдкрд╛рд▓рдирд╛ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ
рдЪрд┐рддреНрд░ 3. рддрд╛рд▓рд┐рдХрд╛ рд▓реЗрдЖрдЙрдЯ
рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рд▓рдЧ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд╢рд┐рдд рдЧрд░реНрджреИ
Pub/Sub рд╣рд╛рдореНрд░реЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрдирдХреЛ рдПрдХ рдорд╣рддреНрд╡рдкреВрд░реНрдг рднрд╛рдЧ рд╣реЛ рдХрд┐рдирднрдиреЗ рдпрд╕рд▓реЗ рдзреЗрд░реИ рд╕реНрд╡рддрдиреНрддреНрд░ рдЕрдиреБрдкреНрд░рдпреЛрдЧрд╣рд░реВрд▓рд╛рдИ рдПрдХрдЕрд░реНрдХрд╛рд╕рдБрдЧ рд╕рдЮреНрдЪрд╛рд░ рдЧрд░реНрди рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫред рд╡рд┐рд╢реЗрд╖ рдЧрд░реА, рдпрд╕рд▓реЗ рдПрдХ рдордзреНрдпрд╕реНрдердХреЛ рд░реВрдкрдорд╛ рдХрд╛рдо рдЧрд░реНрджрдЫ рдЬрд╕рд▓реЗ рд╣рд╛рдореАрд▓рд╛рдИ рдЕрдиреБрдкреНрд░рдпреЛрдЧрд╣рд░реВ рдмреАрдЪ рд╕рдиреНрджреЗрд╢рд╣рд░реВ рдкрдард╛рдЙрди рд░ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрди рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫред рд╣рд╛рдореАрд▓реЗ рдЧрд░реНрдиреБ рдкрд░реНрдиреЗ рдкрд╣рд┐рд▓реЛ рдХреБрд░рд╛ рднрдиреЗрдХреЛ рд╡рд┐рд╖рдп рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБ рд╣реЛред рдХреЗрд╡рд▓ рдХрдиреНрд╕реЛрд▓рдорд╛ рдкрдм/рд╕рдмрдорд╛ рдЬрд╛рдиреБрд╣реЛрд╕реН рд░ рд╡рд┐рд╖рдп рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБрд╣реЛрд╕реН рдХреНрд▓рд┐рдХ рдЧрд░реНрдиреБрд╣реЛрд╕реНред
рддрд▓рдХреЛ рдХреЛрдбрд▓реЗ рдорд╛рдерд┐ рдкрд░рд┐рднрд╛рд╖рд┐рдд рд▓рдЧ рдбреЗрдЯрд╛ рдЙрддреНрдкрдиреНрди рдЧрд░реНрди рд╣рд╛рдореНрд░реЛ рд╕реНрдХреНрд░рд┐рдкреНрдЯрд▓рд╛рдИ рдХрд▓ рдЧрд░реНрдЫ рд░ рддреНрдпрд╕рдкрдЫрд┐ рд▓рдЧрд╣рд░реВрд▓рд╛рдИ Pub/Sub рдорд╛ рдЬрдбрд╛рди рдЧрд░реА рдкрдард╛рдЙрдБрдЫред рд╣рд╛рдореАрд▓реЗ рдЧрд░реНрдиреБ рдкрд░реНрдиреЗ рдХреБрд░рд╛ рднрдиреЗрдХреЛ рд╡рд╕реНрддреБ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБ рд╣реЛ рдкреНрд░рдХрд╛рд╢рдХ рдЧреНрд░рд╛рд╣рдХ, рд╡рд┐рдзрд┐ рдкреНрд░рдпреЛрдЧ рдЧрд░реА рд╡рд┐рд╖рдпрдХреЛ рдорд╛рд░реНрдЧ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрдиреБрд╣реЛрд╕реН topic_path
рд░ рд╕рдорд╛рд░реЛрд╣рдорд╛ рдХрд▓ рдЧрд░реНрдиреБрд╣реЛрд╕реН publish
╤Б topic_path
рд░ рдбрд╛рдЯрд╛ред рдХреГрдкрдпрд╛ рдзреНрдпрд╛рди рджрд┐рдиреБрд╣реЛрд╕реН рдХрд┐ рд╣рд╛рдореА рдЖрдпрд╛рдд рдЧрд░реНрдЫреМрдВ generate_log_line
рд╣рд╛рдореНрд░реЛ рд▓рд┐рдкрд┐рдмрд╛рдЯ stream_logs
, рддреНрдпрд╕реИрд▓реЗ рдпреА рдлрд╛рдЗрд▓рд╣рд░реВ рдПрдЙрдЯреИ рдлреЛрд▓реНрдбрд░рдорд╛ рдЫрдиреН рднрдиреНрдиреЗ рдирд┐рд╢реНрдЪрд┐рдд рдЧрд░реНрдиреБрд╣реЛрд╕реН, рдЕрдиреНрдпрдерд╛ рддрдкрд╛рдИрдВрд▓реЗ рдЖрдпрд╛рдд рддреНрд░реБрдЯрд┐ рдкрд╛рдЙрдиреБрд╣реБрдиреЗрдЫред рддреНрдпрд╕рдкрдЫрд┐ рд╣рд╛рдореА рдпрд╕рд▓рд╛рдИ рд╣рд╛рдореНрд░реЛ рдЧреБрдЧрд▓ рдХрдиреНрд╕реЛрд▓ рдорд╛рд░реНрдлрдд рдЪрд▓рд╛рдЙрди рд╕рдХреНрдЫреМрдВ:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
рдлрд╛рдИрд▓ рдЪрд▓реНрдиреЗ рдмрд┐рддреНрддрд┐рдХреИ, рд╣рд╛рдореА рдХрдиреНрд╕реЛрд▓рдорд╛ рд▓рдЧ рдбрд╛рдЯрд╛рдХреЛ рдЖрдЙрдЯрдкреБрдЯ рд╣реЗрд░реНрди рд╕рдХреНрд╖рдо рд╣реБрдиреЗрдЫреМрдВ, рдЬрд╕реНрддреИ рддрд▓рдХреЛ рдЪрд┐рддреНрд░рдорд╛ рджреЗрдЦрд╛рдЗрдПрдХреЛ рдЫред рд╣рд╛рдореАрд▓реЗ рдкреНрд░рдпреЛрдЧ рдирдЧрд░реЗрд╕рдореНрдо рдпреЛ рд▓рд┐рдкрд┐рд▓реЗ рдХрд╛рдо рдЧрд░реНрдиреЗрдЫ CTRL + Cрдпрд╕рд▓рд╛рдИ рдкреВрд░рд╛ рдЧрд░реНрдиред
рдЪрд┐рддреНрд░ рек. рдЖрдЙрдЯрдкреБрдЯ publish_logs.py
рд╣рд╛рдореНрд░реЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХреЛрдб рд▓реЗрдЦреНрджреИ
рдЕрдм рдЬрдм рд╣рд╛рдореАрд╕рдБрдЧ рд╕рдмреИ рддрдпрд╛рд░ рдЫ, рд╣рд╛рдореА рд░рдорд╛рдЗрд▓реЛ рднрд╛рдЧ рд╕реБрд░реБ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ - рдмреАрдо рд░ рдкрд╛рдЗрдерди рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рд╣рд╛рдореНрд░реЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХреЛрдбрд┐рдЩред рдмреАрдо рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди, рд╣рд╛рдореАрд▓реЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╡рд╕реНрддреБ (p) рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫред рдПрдХрдкрдЯрдХ рд╣рд╛рдореАрд▓реЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╡рд╕реНрддреБ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░рд┐рд╕рдХреЗрдкрдЫрд┐, рд╣рд╛рдореА рдЕрдкрд░реЗрдЯрд░ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдПрдХрдкрдЫрд┐ рдЕрд░реНрдХреЛ рдзреЗрд░реИ рдкреНрд░рдХрд╛рд░реНрдпрд╣рд░реВ рд▓рд╛рдЧреВ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ pipe (|)
ред рд╕рд╛рдорд╛рдиреНрдпрддрдпрд╛, рдХрд╛рд░реНрдпрдкреНрд░рд╡рд╛рд╣ рддрд▓рдХреЛ рдЫрд╡рд┐ рдЬрд╕реНрддреЛ рджреЗрдЦрд┐рдиреНрдЫред
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
рд╣рд╛рдореНрд░реЛ рдХреЛрдбрдорд╛, рд╣рд╛рдореА рджреБрдИ рдЕрдиреБрдХреВрд▓рди рдкреНрд░рдХрд╛рд░реНрдпрд╣рд░реВ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреЗрдЫреМрдВред рд╕рдорд╛рд░реЛрд╣ regex_clean
, рдЬрд╕рд▓реЗ рдбреЗрдЯрд╛ рд╕реНрдХреНрдпрд╛рди рдЧрд░реНрджрдЫ рд░ рдкреНрд░рдХрд╛рд░реНрдп рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ PATTERNS рд╕реВрдЪреАрдорд╛ рдЖрдзрд╛рд░рд┐рдд рд╕рдореНрдмрдиреНрдзрд┐рдд рдкрдЩреНрдХреНрддрд┐ рдкреБрди: рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрджрдЫред re.search
ред рдкреНрд░рдХрд╛рд░реНрдпрд▓реЗ рдЕрд▓реНрдкрд╡рд┐рд░рд╛рдо рдЫреБрдЯреНрдпрд╛рдПрдХреЛ рд╕реНрдЯреНрд░рд┐рдЩ рдлрд░реНрдХрд╛рдЙрдБрдЫред рдпрджрд┐ рддрдкрд╛рдИрдВ рдирд┐рдпрдорд┐рдд рдЕрднрд┐рд╡реНрдпрдХреНрддрд┐ рд╡рд┐рд╢реЗрд╖рдЬреНрдЮ рд╣реБрдиреБрд╣реБрдиреНрди рднрдиреЗ, рдо рдпрд╕рд▓рд╛рдИ рдЬрд╛рдБрдЪ рдЧрд░реНрди рд╕рд┐рдлрд╛рд░рд┐рд╕ рдЧрд░реНрдЫреБ datetime
рдпрд╕рд▓рд╛рдИ рдХрд╛рдо рдЧрд░реНрди рдХреЛ рд▓рд╛рдЧреА рдПрдХ рдкреНрд░рдХрд╛рд░реНрдп рднрд┐рддреНрд░ред рдореИрд▓реЗ рдлрд╛рдЗрд▓рдХреЛ рд╕реБрд░реБрдорд╛ рдЖрдпрд╛рдд рддреНрд░реБрдЯрд┐ рдкреНрд░рд╛рдкреНрдд рдЧрд░рд┐рд░рд╣реЗрдХреЛ рдерд┐рдПрдБ, рдЬреБрди рдЕрдиреМрдареЛ рдерд┐рдпреЛред рдпреЛ рд╕реВрдЪреА рддреНрдпрд╕рдкрдЫрд┐ рд╕рдорд╛рд░реЛрд╣рдорд╛ рдкрдард╛рдЗрдиреНрдЫ BigQuery рд▓реЗрдЦреНрдиреБрд╣реЛрд╕реН, рдЬрд╕рд▓реЗ рд╣рд╛рдореНрд░реЛ рдбрд╛рдЯрд╛рд▓рд╛рдИ рдЯреЗрдмрд▓рдорд╛ рдердкреНрдЫред рдмреНрдпрд╛рдЪ DataFlow Job рд░ Streaming DataFlow Job рдХреЛ рд▓рд╛рдЧрд┐ рдХреЛрдб рддрд▓ рджрд┐рдЗрдПрдХреЛ рдЫред рдмреНрдпрд╛рдЪ рд░ рд╕реНрдЯреНрд░рд┐рдорд┐рдЩ рдХреЛрдб рдмреАрдЪрдХреЛ рднрд┐рдиреНрдирддрд╛ рднрдиреЗрдХреЛ рдмреНрдпрд╛рдЪрдорд╛ рд╣рд╛рдореАрд▓реЗ CSV рдмрд╛рдЯ рдкрдвреНрдЫреМрдВ src_path
рдкреНрд░рдХрд╛рд░реНрдп рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджреИ ReadFromText
рдмреАрдо рдмрд╛рдЯред
рдмреНрдпрд╛рдЪ рдбрд╛рдЯрд╛рдлреНрд▓реЛ рдХрд╛рд░реНрдп (рдмреНрдпрд╛рдЪ рдкреНрд░рд╢реЛрдзрди)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
рд╕реНрдЯреНрд░рд┐рдорд┐рдЩ рдбрд╛рдЯрд╛рдлреНрд▓реЛ рдХрд╛рд░реНрдп (рд╕реНрдЯреНрд░рд┐рдо рдкреНрд░рд╢реЛрдзрди)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
рдХрдиреНрд╡реЗрдпрд░ рд╕реБрд░реБ рдЧрд░реНрджреИ
рд╣рд╛рдореА рд╡рд┐рднрд┐рдиреНрди рддрд░рд┐рдХрд╛рдорд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд╛рдЙрди рд╕рдХреНрдЫреМрдВред рдпрджрд┐ рд╣рд╛рдореАрд▓реЗ рдЪрд╛рд╣реЗрдХреЛ рдЦрдгреНрдбрдорд╛, рдЯрд╛рдврд╛рдмрд╛рдЯ GCP рдорд╛ рд▓рдЧ рдЗрди рдЧрд░реНрджрд╛ рд╣рд╛рдореАрд▓реЗ рдпрд╕рд▓рд╛рдИ рдЯрд░реНрдорд┐рдирд▓рдмрд╛рдЯ рд╕реНрдерд╛рдиреАрдп рд░реВрдкрдорд╛ рдЪрд▓рд╛рдЙрди рд╕рдХреНрдЫреМрдВред
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
рдпрджреНрдпрдкрд┐, рд╣рд╛рдореА рдпрд╕рд▓рд╛рдИ DataFlow рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдЪрд▓рд╛рдЙрдиреЗ рдЫреМрдВред рд╣рд╛рдореА рдирд┐рдореНрди рдЖрд╡рд╢реНрдпрдХ рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░рд╣рд░реВ рд╕реЗрдЯ рдЧрд░реЗрд░ рддрд▓рдХреЛ рдЖрджреЗрд╢ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдпреЛ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВред
project
тАФ рддрдкрд╛рдИрдВрдХреЛ GCP рдкрд░рд┐рдпреЛрдЬрдирд╛рдХреЛ IDредrunner
рдПрдХ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдзрд╛рд╡рдХ рд╣реЛ рдЬрд╕рд▓реЗ рддрдкрд╛рдИрдВрдХреЛ рдХрд╛рд░реНрдпрдХреНрд░рдордХреЛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдЧрд░реНрдиреЗрдЫ рд░ рддрдкрд╛рдИрдВрдХреЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдирд┐рд░реНрдорд╛рдг рдЧрд░реНрдиреЗрдЫред рдХреНрд▓рд╛рдЙрдбрдорд╛ рдЪрд▓рд╛рдЙрдирдХреЛ рд▓рд╛рдЧрд┐, рддрдкрд╛рдИрдВрд▓реЗ DataflowRunner рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрдиреБрдкрд░реНрдЫредstaging_location
тАФ рдХрд╛рдо рдкреНрд░рджрд░реНрд╢рди рдЧрд░реНрдиреЗ рдкреНрд░реЛрд╕реЗрд╕рд░рд╣рд░реВрд▓рд╛рдИ рдЖрд╡рд╢реНрдпрдХ рдкрд░реНрдиреЗ рдХреЛрдб рдкреНрдпрд╛рдХреЗрдЬрд╣рд░реВ рдЕрдиреБрдХреНрд░рдордгрд┐рдХрд╛рдХреЛ рд▓рд╛рдЧрд┐ рдХреНрд▓рд╛рдЙрдб рдбрд╛рдЯрд╛рдлреНрд▓реЛ рдХреНрд▓рд╛рдЙрдб рднрдгреНрдбрд╛рд░рдгрдХреЛ рд▓рд╛рдЧрд┐ рдорд╛рд░реНрдЧредtemp_location
- рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд┐рд░рд╣реЗрдХреЛ рдмреЗрд▓рд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░рд┐рдПрдХрд╛ рдЕрд╕реНрдерд╛рдпреА рдХрд╛рдо рдлрд╛рдЗрд▓рд╣рд░реВ рднрдгреНрдбрд╛рд░рдг рдЧрд░реНрдирдХреЛ рд▓рд╛рдЧрд┐ рдХреНрд▓рд╛рдЙрдб рдбрд╛рдЯрд╛рдлреНрд▓реЛ рдХреНрд▓рд╛рдЙрдб рднрдгреНрдбрд╛рд░рдгрдХреЛ рд▓рд╛рдЧрд┐ рдорд╛рд░реНрдЧредstreaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
рдпреЛ рдЖрджреЗрд╢ рдЪрд▓рд┐рд░рд╣реЗрдХреЛ рдмреЗрд▓рд╛, рд╣рд╛рдореА Google рдХрдиреНрд╕реЛрд▓рдорд╛ DataFlow рдЯреНрдпрд╛рдмрдорд╛ рдЬрд╛рди рд╕рдХреНрдЫреМрдВ рд░ рд╣рд╛рдореНрд░реЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╣реЗрд░реНрди рд╕рдХреНрдЫреМрдВред рдЬрдм рд╣рд╛рдореА рдкрд╛рдЗрдкрд▓рд╛рдЗрдирдорд╛ рдХреНрд▓рд┐рдХ рдЧрд░реНрдЫреМрдВ, рд╣рд╛рдореАрд▓реЗ рдЪрд┐рддреНрд░ 4 рдЬрд╕реНрддреИ рдХреЗрд╣рд┐ рджреЗрдЦреНрдиреБрдкрд░реНрдЫред рдбрд┐рдмрдЧрд┐рдЩ рдЙрджреНрджреЗрд╢реНрдпрдХрд╛ рд▓рд╛рдЧрд┐, рд╡рд┐рд╕реНрддреГрдд рд▓рдЧрд╣рд░реВ рд╣реЗрд░реНрди рд▓рдЧрд╣рд░реВрдорд╛ рд░ рддреНрдпрд╕рдкрдЫрд┐ Stackdriver рдорд╛ рдЬрд╛рди рдзреЗрд░реИ рдЙрдкрдпреЛрдЧреА рд╣реБрди рд╕рдХреНрдЫред рдпрд╕рд▓реЗ рдорд▓рд╛рдИ рдзреЗрд░реИ рдХреЗрд╕рд╣рд░реВрдорд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рдорд╕реНрдпрд╛рд╣рд░реВ рд╕рдорд╛рдзрд╛рди рдЧрд░реНрди рдорджреНрджрдд рдЧрд░реЗрдХреЛ рдЫред
рдЪрд┐рддреНрд░ 4: рдмреАрдо рдХрдиреНрд╡реЗрдпрд░
BigQuery рдорд╛ рд╣рд╛рдореНрд░реЛ рдбреЗрдЯрд╛ рдкрд╣реБрдБрдЪ рдЧрд░реНрдиреБрд╣реЛрд╕реН
рддреНрдпрд╕реЛрднрдП, рд╣рд╛рдореАрд╕рдБрдЧ рдкрд╣рд┐рд▓реЗ рдиреИ рд╣рд╛рдореНрд░реЛ рддрд╛рд▓рд┐рдХрд╛рдорд╛ рдбрд╛рдЯрд╛ рдкреНрд░рд╡рд╛рд╣ рднрдПрдХреЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд┐рд░рд╣реЗрдХреЛ рд╣реБрдиреБрдкрд░реНрдЫред рдпреЛ рдкрд░реАрдХреНрд╖рдг рдЧрд░реНрди, рд╣рд╛рдореА BigQuery рдорд╛ рдЧрдПрд░ рдбреЗрдЯрд╛ рд╣реЗрд░реНрди рд╕рдХреНрдЫреМрдВред рддрд▓рдХреЛ рдЖрджреЗрд╢ рдкреНрд░рдпреЛрдЧ рдЧрд░рд┐рд╕рдХреЗрдкрдЫрд┐ рддрдкрд╛рдИрдВрд▓реЗ рдбреЗрдЯрд╛рд╕реЗрдЯрдХреЛ рдкрд╣рд┐рд▓реЛ рдХреЗрд╣реА рдкрдЩреНрдХреНрддрд┐рд╣рд░реВ рджреЗрдЦреНрдиреБрдкрд░реНрдЫред рдЕрдм рд╣рд╛рдореАрд╕рдБрдЧ BigQuery рдорд╛ рднрдгреНрдбрд╛рд░рдг рдЧрд░рд┐рдПрдХреЛ рдбреЗрдЯрд╛ рдЫ, рд╣рд╛рдореА рдердк рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ, рд╕рд╛рдереИ рдбреЗрдЯрд╛ рд╕рд╣рдХрд░реНрдореАрд╣рд░реВрд╕рдБрдЧ рд╕рд╛рдЭреЗрджрд╛рд░реА рдЧрд░реНрди рд░ рд╡реНрдпрд╡рд╕рд╛рдпрд┐рдХ рдкреНрд░рд╢реНрдирд╣рд░реВрдХреЛ рдЬрд╡рд╛рдл рджрд┐рди рдерд╛рд▓реНрдЫреМрдВред
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
рдЪрд┐рддреНрд░ рел: BigQuery
рдирд┐рд╖реНрдХрд░реНрд╖рдорд╛
рд╣рд╛рдореА рдЖрд╢рд╛ рдЧрд░реНрджрдЫреМрдВ рдХрд┐ рдпреЛ рдкреЛрд╖реНрдЯрд▓реЗ рд╕реНрдЯреНрд░рд┐рдорд┐рдЩ рдбреЗрдЯрд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреЗ, рд╕рд╛рдереИ рдбреЗрдЯрд╛рд▓рд╛рдИ рдердк рдкрд╣реБрдБрдЪрдпреЛрдЧреНрдп рдмрдирд╛рдЙрдиреЗ рддрд░рд┐рдХрд╛рд╣рд░реВ рдЦреЛрдЬреНрдиреЗ рдЙрдкрдпреЛрдЧреА рдЙрджрд╛рд╣рд░рдгрдХреЛ рд░реВрдкрдорд╛ рдХрд╛рдо рдЧрд░реНрджрдЫред рдпрд╕ рдврд╛рдБрдЪрд╛рдорд╛ рдбрд╛рдЯрд╛ рднрдгреНрдбрд╛рд░рдгрд▓реЗ рд╣рд╛рдореАрд▓рд╛рдИ рдзреЗрд░реИ рдлрд╛рдЗрджрд╛рд╣рд░реВ рджрд┐рдиреНрдЫред рдЕрдм рд╣рд╛рдореА рдорд╣рддреНрддреНрд╡рдкреВрд░реНрдг рдкреНрд░рд╢реНрдирд╣рд░реВрдХреЛ рдЬрд╡рд╛рдл рджрд┐рди рд╕реБрд░реБ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ рдЬрд╕реНрддреИ рдХрддрд┐ рдорд╛рдирд┐рд╕рд╣рд░реВрд▓реЗ рд╣рд╛рдореНрд░реЛ рдЙрддреНрдкрд╛рджрди рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдЫрдиреН? рдХреЗ рддрдкрд╛рдЗрдБрдХреЛ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рдЖрдзрд╛рд░ рд╕рдордп рд╕рдВрдЧ рдмрдвреНрджреИ рдЫ? рдЙрддреНрдкрд╛рджрдирдХрд╛ рдХреБрди рдкрдХреНрд╖рд╣рд░реВрд╕рдБрдЧ рдорд╛рдирд┐рд╕рд╣рд░реВрд▓реЗ рд╕рдмреИрднрдиреНрджрд╛ рдмрдвреА рдЕрдиреНрддрд░рдХреНрд░рд┐рдпрд╛ рдЧрд░реНрдЫрдиреН? рд░ рддреНрдпрд╣рд╛рдБ рддреНрд░реБрдЯрд┐рд╣рд░реВ рдЫрдиреН рдЬрд╣рд╛рдБ рд╣реБрдиреБ рд╣реБрдБрджреИрди? рдпреА рдкреНрд░рд╢реНрдирд╣рд░реВ рд╣реБрдиреН рдЬреБрди рд╕рдВрд╕реНрдерд╛рдХреЛ рдЪрд╛рд╕реЛрдХреЛ рд╡рд┐рд╖рдп рд╣реБрдиреЗрдЫред рдпреА рдкреНрд░рд╢реНрдирд╣рд░реВрдХреЛ рдЬрд╡рд╛рдлрдмрд╛рдЯ рдЙрддреНрдкрдиреНрди рд╣реБрдиреЗ рдЕрдиреНрддрд░рджреГрд╖реНрдЯрд┐рд╣рд░реВрдХреЛ рдЖрдзрд╛рд░рдорд╛, рд╣рд╛рдореА рдЙрддреНрдкрд╛рджрди рд╕реБрдзрд╛рд░ рдЧрд░реНрди рд░ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рд╕рдВрд▓рдЧреНрдирддрд╛ рдмрдврд╛рдЙрди рд╕рдХреНрдЫреМрдВред
рдмреАрдо рдпрд╕ рдкреНрд░рдХрд╛рд░рдХреЛ рд╡реНрдпрд╛рдпрд╛рдордХреЛ рд▓рд╛рдЧрд┐ рд╕рд╛рдБрдЪреНрдЪреИ рдЙрдкрдпреЛрдЧреА рдЫ рд░ рд╕рд╛рдереИ рдЕрдиреНрдп рд░реЛрдЪрдХ рдкреНрд░рдпреЛрдЧ рдХреЗрд╕рд╣рд░реВ рдкрдирд┐ рдЫрдиреНред рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐, рддрдкрд╛рдИрдВ рд╡рд╛рд╕реНрддрд╡рд┐рдХ рд╕рдордпрдорд╛ рд╕реНрдЯрдХ рдЯрд┐рдХ рдбрд╛рдЯрд╛рдХреЛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдЧрд░реНрди рд░ рд╡рд┐рд╢реНрд▓реЗрд╖рдгрдорд╛ рдЖрдзрд╛рд░рд┐рдд рдЯреНрд░реЗрдбрд╣рд░реВ рдЧрд░реНрди рдЪрд╛рд╣рдиреБрд╣реБрдиреНрдЫ, рд╕рд╛рдпрдж рддрдкрд╛рдИрдВрд╕рдБрдЧ рд╕рд╡рд╛рд░реА рд╕рд╛рдзрдирд╣рд░реВрдмрд╛рдЯ рдЖрдЙрдиреЗ рд╕реЗрдиреНрд╕рд░ рдбреЗрдЯрд╛ рдЫ рд░ рдЯреНрд░рд╛рдлрд┐рдХ рд╕реНрддрд░ рдЧрдгрдирд╛рд╣рд░реВ рдЧрдгрдирд╛ рдЧрд░реНрди рдЪрд╛рд╣рдиреБрд╣реБрдиреНрдЫред рддрдкрд╛рдИрдВ рдкрдирд┐, рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐, рдПрдХ рдЧреЗрдорд┐рдЩ рдХрдореНрдкрдиреА рд╣реБрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ рдЬрд╕рд▓реЗ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рдбреЗрдЯрд╛ рд╕рдЩреНрдХрд▓рди рдЧрд░реНрджрдЫ рд░ рдореБрдЦреНрдп рдореЗрдЯреНрд░рд┐рдХреНрд╕ рдЯреНрд░реНрдпрд╛рдХ рдЧрд░реНрди рдбреНрдпрд╛рд╕рдмреЛрд░реНрдбрд╣рд░реВ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджрдЫред рдареАрдХ рдЫ, рд╕рдЬреНрдЬрдирд╣рд░реВ, рдпреЛ рдЕрд░реНрдХреЛ рдкреЛрд╖реНрдЯрдХреЛ рд▓рд╛рдЧрд┐ рд╡рд┐рд╖рдп рд╣реЛ, рдкрдвреНрдирдХреЛ рд▓рд╛рдЧрд┐ рдзрдиреНрдпрд╡рд╛рдж, рд░ рдкреВрд░реНрдг рдХреЛрдб рд╣реЗрд░реНрди рдЪрд╛рд╣рдиреЗрд╣рд░реВрдХрд╛ рд▓рд╛рдЧрд┐, рддрд▓ рдореЗрд░реЛ GitHub рдХреЛ рд▓рд┐рдЩреНрдХ рдЫред
рдпрддрд┐ рдиреИред
рд╕реНрд░реЛрдд: www.habr.com