рд╣рд╛рдореА рд╕реНрдЯреНрд░рд┐рдо рдбрд╛рдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдЩ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВред рднрд╛рдЧ 2

рдирдорд╕реНрддреЗ рд╕рдмреИред рд╣рд╛рдореА рд▓реЗрдЦрдХреЛ рдЕрдиреНрддрд┐рдо рднрд╛рдЧрдХреЛ рдЕрдиреБрд╡рд╛рдж рд╕рд╛рдЭрд╛ рдЧрд░реНрджреИрдЫреМрдВ, рд╡рд┐рд╢реЗрд╖ рдЧрд░реА рдкрд╛рдареНрдпрдХреНрд░рдордХрд╛ рд╡рд┐рджреНрдпрд╛рд░реНрдереАрд╣рд░реВрдХрд╛ рд▓рд╛рдЧрд┐ рддрдпрд╛рд░ рдЧрд░рд┐рдПрдХреЛред рдбрд╛рдЯрд╛ рдЗрдиреНрдЬрд┐рдирд┐рдпрд░ред рдкрд╣рд┐рд▓реЛ рднрд╛рдЧ рдкрдвреНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ рдпрд╣рд╛рдБ.

Apache Beam рд░ DataFlow рд╡рд╛рд╕реНрддрд╡рд┐рдХ-рд╕рдордп рдкрд╛рдЗрдкрд▓рд╛рдЗрдирд╣рд░реВрдХреЛ рд▓рд╛рдЧрд┐

рд╣рд╛рдореА рд╕реНрдЯреНрд░рд┐рдо рдбрд╛рдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдЩ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВред рднрд╛рдЧ 2

Google рдХреНрд▓рд╛рдЙрдб рд╕реЗрдЯрдЕрдк рдЧрд░реНрджреИ

рдиреЛрдЯ: рдореИрд▓реЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд╛рдЙрди рд░ рдЕрдиреБрдХреВрд▓рди рд▓рдЧ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд╢рд┐рдд рдЧрд░реНрди Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрдВ рдХрд┐рдирднрдиреЗ рдорд▓рд╛рдИ рдкрд╛рдЗрдерди 3 рдорд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд╛рдЙрди рд╕рдорд╕реНрдпрд╛ рднрдЗрд░рд╣реЗрдХреЛ рдерд┐рдпреЛред Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓рд▓реЗ рдкрд╛рдЗрдерди 2 рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджрдЫ, рдЬреБрди Apache Beam рд╕рдБрдЧ рдЕрдзрд┐рдХ рд╕реБрд╕рдВрдЧрдд рдЫред

рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕реБрд░реБ рдЧрд░реНрди, рд╣рд╛рдореАрд▓реЗ рд╕реЗрдЯрд┐рдЩрд╣рд░реВрдорд╛ рдереЛрд░реИ рдЦрдиреНрдиреБрдкрд░реНрдЫред рддрдкрд╛рдИрдВрд╣рд░реВ рдордзреНрдпреЗ рдЬрд╕рд▓реЗ рдкрд╣рд┐рд▓реЗ GCP рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдиреБрднрдПрдХреЛ рдЫреИрди, рддрдкрд╛рдИрдВрд▓реЗ рдпрд╕рдорд╛ рдЙрд▓реНрд▓рд┐рдЦрд┐рдд рдирд┐рдореНрди 6 рдЪрд░рдгрд╣рд░реВ рдкрд╛рд▓рдирд╛ рдЧрд░реНрдиреБрдкрд░реНрдиреЗрдЫред рдкреГрд╖реНрда.

рдпрд╕ рдкрдЫрд┐, рд╣рд╛рдореАрд▓реЗ рд╣рд╛рдореНрд░рд╛ рд╕реНрдХреНрд░рд┐рдкреНрдЯрд╣рд░реВ рдЧреБрдЧрд▓ рдХреНрд▓рд╛рдЙрдб рднрдгреНрдбрд╛рд░рдгрдорд╛ рдЕрдкрд▓реЛрдб рдЧрд░реНрдиреБрдкрд░реНрдиреЗрдЫ рд░ рддрд┐рдиреАрд╣рд░реВрд▓рд╛рдИ рд╣рд╛рдореНрд░реЛ рдЧреБрдЧрд▓ рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓рдорд╛ рдкреНрд░рддрд┐рд▓рд┐рдкрд┐ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫред рдХреНрд▓рд╛рдЙрдб рднрдгреНрдбрд╛рд░рдгрдорд╛ рдЕрдкрд▓реЛрдб рдЧрд░реНрдиреБ рдПрдХрджрдо рдорд╛рдореВрд▓реА рдЫ (рдПрдХ рд╡рд┐рд╡рд░рдг рдлреЗрд▓рд╛ рдкрд╛рд░реНрди рд╕рдХрд┐рдиреНрдЫ рдпрд╣рд╛рдБ)ред рд╣рд╛рдореНрд░рд╛ рдлрд╛рдЗрд▓рд╣рд░реВ рдкреНрд░рддрд┐рд▓рд┐рдкрд┐ рдЧрд░реНрди, рд╣рд╛рдореА рддрд▓рдХреЛ рдЪрд┐рддреНрд░ реи рдорд╛ рдмрд╛рдБрдпрд╛рдорд╛ рд░рд╣реЗрдХреЛ рдкрд╣рд┐рд▓реЛ рдЖрдЗрдХрдирдорд╛ рдХреНрд▓рд┐рдХ рдЧрд░реЗрд░ рдЙрдкрдХрд░рдгрдкрдЯреНрдЯреАрдмрд╛рдЯ рдЧреБрдЧрд▓ рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓ рдЦреЛрд▓реНрди рд╕рдХреНрдЫреМрдВред

рд╣рд╛рдореА рд╕реНрдЯреНрд░рд┐рдо рдбрд╛рдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдЩ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВред рднрд╛рдЧ 2
2 рдлрд┐рдЧрд░

рд╣рд╛рдореАрд▓реЗ рдлрд╛рдЗрд▓рд╣рд░реВ рдкреНрд░рддрд┐рд▓рд┐рдкрд┐ рдЧрд░реНрди рд░ рдЖрд╡рд╢реНрдпрдХ рдкреБрд╕реНрддрдХрд╛рд▓рдпрд╣рд░реВ рд╕реНрдерд╛рдкрдирд╛ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЖрджреЗрд╢рд╣рд░реВ рддрд▓ рд╕реВрдЪреАрдмрджреНрдз рдЫрдиреНред

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

рд╣рд╛рдореНрд░реЛ рдбрд╛рдЯрд╛рдмреЗрд╕ рд░ рддрд╛рд▓рд┐рдХрд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрджреИ

рд╣рд╛рдореАрд▓реЗ рд╕реЗрдЯрдЕрдк рд╕рдореНрдмрдиреНрдзрд┐рдд рд╕рдмреИ рдЪрд░рдгрд╣рд░реВ рдкреВрд░рд╛ рдЧрд░рд┐рд╕рдХреЗрдкрдЫрд┐, рд╣рд╛рдореАрд▓реЗ рдЧрд░реНрдиреБрдкрд░реНрдиреЗ рдЕрд░реНрдХреЛ рдХреБрд░рд╛ BigQuery рдорд╛ рдбреЗрдЯрд╛рд╕реЗрдЯ рд░ рддрд╛рд▓рд┐рдХрд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБ рд╣реЛред рдпрд╕рдХрд╛ рд▓рд╛рдЧрд┐ рдзреЗрд░реИ рддрд░рд┐рдХрд╛рд╣рд░реВ рдЫрдиреН, рддрд░ рд╕рдмреИрднрдиреНрджрд╛ рд╕рд░рд▓ рднрдиреЗрдХреЛ рдкрд╣рд┐рд▓реЛ рдбреЗрдЯрд╛рд╕реЗрдЯ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реЗрд░ Google рдХреНрд▓рд╛рдЙрдб рдХрдиреНрд╕реЛрд▓ рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдиреБ рд╣реЛред рддрдкрд╛рдИрдВ рддрд▓рдХрд╛ рдЪрд░рдгрд╣рд░реВ рдкрд╛рд▓рдирд╛ рдЧрд░реНрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ рд▓рд┐рдЩреНрдХрд╕реНрдХреАрдорд╛рдХреЛ рд╕рд╛рде рддрд╛рд▓рд┐рдХрд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиред рд╣рд╛рдореНрд░реЛ рдЯреЗрдмрд▓ рд╣реБрдиреЗрдЫ резреи рд╕реНрддрдореНрднрд╣рд░реВ, рдкреНрд░рддреНрдпреЗрдХ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рд▓рдЧ рдХреЛ рдЕрд╡рдпрд╡рд╣рд░реБ рд╕рдВрдЧрддред рд╕реБрд╡рд┐рдзрд╛рдХреЛ рд▓рд╛рдЧрд┐, рд╣рд╛рдореА рдЯрд╛рдЗрдорд▓реЛрдХрд▓ рднреЗрд░рд┐рдПрдмрд▓ рдмрд╛рд╣реЗрдХ рд╕рдмреИ рд╕реНрддрдореНрднрд╣рд░реВрд▓рд╛рдИ рд╕реНрдЯреНрд░рд┐рдЩрдХреЛ рд░реВрдкрдорд╛ рдкрд░рд┐рднрд╛рд╖рд┐рдд рдЧрд░реНрдиреЗрдЫреМрдВ, рд░ рд╣рд╛рдореАрд▓реЗ рдкрд╣рд┐рд▓реЗ рдЙрддреНрдкрдиреНрди рдЧрд░реЗрдХрд╛ рдЪрд░рд╣рд░реВ рдЕрдиреБрд╕рд╛рд░ рддрд┐рдиреАрд╣рд░реВрд▓рд╛рдИ рдирд╛рдо рджрд┐рдиреЗрдЫреМрдВред рд╣рд╛рдореНрд░реЛ рддрд╛рд▓рд┐рдХрд╛рдХреЛ рд▓реЗрдЖрдЙрдЯ рдЪрд┐рддреНрд░ 3 рдорд╛ рджреЗрдЦрд┐рдиреБ рдкрд░реНрдЫред

рд╣рд╛рдореА рд╕реНрдЯреНрд░рд┐рдо рдбрд╛рдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдЩ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВред рднрд╛рдЧ 2
рдЪрд┐рддреНрд░ 3. рддрд╛рд▓рд┐рдХрд╛ рд▓реЗрдЖрдЙрдЯ

рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рд▓рдЧ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд╢рд┐рдд рдЧрд░реНрджреИ

Pub/Sub рд╣рд╛рдореНрд░реЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрдирдХреЛ рдПрдХ рдорд╣рддреНрд╡рдкреВрд░реНрдг рднрд╛рдЧ рд╣реЛ рдХрд┐рдирднрдиреЗ рдпрд╕рд▓реЗ рдзреЗрд░реИ рд╕реНрд╡рддрдиреНрддреНрд░ рдЕрдиреБрдкреНрд░рдпреЛрдЧрд╣рд░реВрд▓рд╛рдИ рдПрдХрдЕрд░реНрдХрд╛рд╕рдБрдЧ рд╕рдЮреНрдЪрд╛рд░ рдЧрд░реНрди рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫред рд╡рд┐рд╢реЗрд╖ рдЧрд░реА, рдпрд╕рд▓реЗ рдПрдХ рдордзреНрдпрд╕реНрдердХреЛ рд░реВрдкрдорд╛ рдХрд╛рдо рдЧрд░реНрджрдЫ рдЬрд╕рд▓реЗ рд╣рд╛рдореАрд▓рд╛рдИ рдЕрдиреБрдкреНрд░рдпреЛрдЧрд╣рд░реВ рдмреАрдЪ рд╕рдиреНрджреЗрд╢рд╣рд░реВ рдкрдард╛рдЙрди рд░ рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрди рдЕрдиреБрдорддрд┐ рджрд┐рдиреНрдЫред рд╣рд╛рдореАрд▓реЗ рдЧрд░реНрдиреБ рдкрд░реНрдиреЗ рдкрд╣рд┐рд▓реЛ рдХреБрд░рд╛ рднрдиреЗрдХреЛ рд╡рд┐рд╖рдп рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБ рд╣реЛред рдХреЗрд╡рд▓ рдХрдиреНрд╕реЛрд▓рдорд╛ рдкрдм/рд╕рдмрдорд╛ рдЬрд╛рдиреБрд╣реЛрд╕реН рд░ рд╡рд┐рд╖рдп рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБрд╣реЛрд╕реН рдХреНрд▓рд┐рдХ рдЧрд░реНрдиреБрд╣реЛрд╕реНред

рддрд▓рдХреЛ рдХреЛрдбрд▓реЗ рдорд╛рдерд┐ рдкрд░рд┐рднрд╛рд╖рд┐рдд рд▓рдЧ рдбреЗрдЯрд╛ рдЙрддреНрдкрдиреНрди рдЧрд░реНрди рд╣рд╛рдореНрд░реЛ рд╕реНрдХреНрд░рд┐рдкреНрдЯрд▓рд╛рдИ рдХрд▓ рдЧрд░реНрдЫ рд░ рддреНрдпрд╕рдкрдЫрд┐ рд▓рдЧрд╣рд░реВрд▓рд╛рдИ Pub/Sub рдорд╛ рдЬрдбрд╛рди рдЧрд░реА рдкрдард╛рдЙрдБрдЫред рд╣рд╛рдореАрд▓реЗ рдЧрд░реНрдиреБ рдкрд░реНрдиреЗ рдХреБрд░рд╛ рднрдиреЗрдХреЛ рд╡рд╕реНрддреБ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБ рд╣реЛ рдкреНрд░рдХрд╛рд╢рдХ рдЧреНрд░рд╛рд╣рдХ, рд╡рд┐рдзрд┐ рдкреНрд░рдпреЛрдЧ рдЧрд░реА рд╡рд┐рд╖рдпрдХреЛ рдорд╛рд░реНрдЧ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрдиреБрд╣реЛрд╕реН topic_path рд░ рд╕рдорд╛рд░реЛрд╣рдорд╛ рдХрд▓ рдЧрд░реНрдиреБрд╣реЛрд╕реН publish ╤Б topic_path рд░ рдбрд╛рдЯрд╛ред рдХреГрдкрдпрд╛ рдзреНрдпрд╛рди рджрд┐рдиреБрд╣реЛрд╕реН рдХрд┐ рд╣рд╛рдореА рдЖрдпрд╛рдд рдЧрд░реНрдЫреМрдВ generate_log_line рд╣рд╛рдореНрд░реЛ рд▓рд┐рдкрд┐рдмрд╛рдЯ stream_logs, рддреНрдпрд╕реИрд▓реЗ рдпреА рдлрд╛рдЗрд▓рд╣рд░реВ рдПрдЙрдЯреИ рдлреЛрд▓реНрдбрд░рдорд╛ рдЫрдиреН рднрдиреНрдиреЗ рдирд┐рд╢реНрдЪрд┐рдд рдЧрд░реНрдиреБрд╣реЛрд╕реН, рдЕрдиреНрдпрдерд╛ рддрдкрд╛рдИрдВрд▓реЗ рдЖрдпрд╛рдд рддреНрд░реБрдЯрд┐ рдкрд╛рдЙрдиреБрд╣реБрдиреЗрдЫред рддреНрдпрд╕рдкрдЫрд┐ рд╣рд╛рдореА рдпрд╕рд▓рд╛рдИ рд╣рд╛рдореНрд░реЛ рдЧреБрдЧрд▓ рдХрдиреНрд╕реЛрд▓ рдорд╛рд░реНрдлрдд рдЪрд▓рд╛рдЙрди рд╕рдХреНрдЫреМрдВ:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

рдлрд╛рдИрд▓ рдЪрд▓реНрдиреЗ рдмрд┐рддреНрддрд┐рдХреИ, рд╣рд╛рдореА рдХрдиреНрд╕реЛрд▓рдорд╛ рд▓рдЧ рдбрд╛рдЯрд╛рдХреЛ рдЖрдЙрдЯрдкреБрдЯ рд╣реЗрд░реНрди рд╕рдХреНрд╖рдо рд╣реБрдиреЗрдЫреМрдВ, рдЬрд╕реНрддреИ рддрд▓рдХреЛ рдЪрд┐рддреНрд░рдорд╛ рджреЗрдЦрд╛рдЗрдПрдХреЛ рдЫред рд╣рд╛рдореАрд▓реЗ рдкреНрд░рдпреЛрдЧ рдирдЧрд░реЗрд╕рдореНрдо рдпреЛ рд▓рд┐рдкрд┐рд▓реЗ рдХрд╛рдо рдЧрд░реНрдиреЗрдЫ CTRL + Cрдпрд╕рд▓рд╛рдИ рдкреВрд░рд╛ рдЧрд░реНрдиред

рд╣рд╛рдореА рд╕реНрдЯреНрд░рд┐рдо рдбрд╛рдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдЩ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВред рднрд╛рдЧ 2
рдЪрд┐рддреНрд░ рек. рдЖрдЙрдЯрдкреБрдЯ publish_logs.py

рд╣рд╛рдореНрд░реЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХреЛрдб рд▓реЗрдЦреНрджреИ

рдЕрдм рдЬрдм рд╣рд╛рдореАрд╕рдБрдЧ рд╕рдмреИ рддрдпрд╛рд░ рдЫ, рд╣рд╛рдореА рд░рдорд╛рдЗрд▓реЛ рднрд╛рдЧ рд╕реБрд░реБ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ - рдмреАрдо рд░ рдкрд╛рдЗрдерди рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рд╣рд╛рдореНрд░реЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХреЛрдбрд┐рдЩред рдмреАрдо рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди, рд╣рд╛рдореАрд▓реЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╡рд╕реНрддреБ (p) рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди рдЖрд╡рд╢реНрдпрдХ рдЫред рдПрдХрдкрдЯрдХ рд╣рд╛рдореАрд▓реЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╡рд╕реНрддреБ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░рд┐рд╕рдХреЗрдкрдЫрд┐, рд╣рд╛рдореА рдЕрдкрд░реЗрдЯрд░ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдПрдХрдкрдЫрд┐ рдЕрд░реНрдХреЛ рдзреЗрд░реИ рдкреНрд░рдХрд╛рд░реНрдпрд╣рд░реВ рд▓рд╛рдЧреВ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ pipe (|)ред рд╕рд╛рдорд╛рдиреНрдпрддрдпрд╛, рдХрд╛рд░реНрдпрдкреНрд░рд╡рд╛рд╣ рддрд▓рдХреЛ рдЫрд╡рд┐ рдЬрд╕реНрддреЛ рджреЗрдЦрд┐рдиреНрдЫред

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

рд╣рд╛рдореНрд░реЛ рдХреЛрдбрдорд╛, рд╣рд╛рдореА рджреБрдИ рдЕрдиреБрдХреВрд▓рди рдкреНрд░рдХрд╛рд░реНрдпрд╣рд░реВ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреЗрдЫреМрдВред рд╕рдорд╛рд░реЛрд╣ regex_clean, рдЬрд╕рд▓реЗ рдбреЗрдЯрд╛ рд╕реНрдХреНрдпрд╛рди рдЧрд░реНрджрдЫ рд░ рдкреНрд░рдХрд╛рд░реНрдп рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ PATTERNS рд╕реВрдЪреАрдорд╛ рдЖрдзрд╛рд░рд┐рдд рд╕рдореНрдмрдиреНрдзрд┐рдд рдкрдЩреНрдХреНрддрд┐ рдкреБрди: рдкреНрд░рд╛рдкреНрдд рдЧрд░реНрджрдЫред re.searchред рдкреНрд░рдХрд╛рд░реНрдпрд▓реЗ рдЕрд▓реНрдкрд╡рд┐рд░рд╛рдо рдЫреБрдЯреНрдпрд╛рдПрдХреЛ рд╕реНрдЯреНрд░рд┐рдЩ рдлрд░реНрдХрд╛рдЙрдБрдЫред рдпрджрд┐ рддрдкрд╛рдИрдВ рдирд┐рдпрдорд┐рдд рдЕрднрд┐рд╡реНрдпрдХреНрддрд┐ рд╡рд┐рд╢реЗрд╖рдЬреНрдЮ рд╣реБрдиреБрд╣реБрдиреНрди рднрдиреЗ, рдо рдпрд╕рд▓рд╛рдИ рдЬрд╛рдБрдЪ рдЧрд░реНрди рд╕рд┐рдлрд╛рд░рд┐рд╕ рдЧрд░реНрдЫреБ рдЯреНрдпреВрдЯреЛрд░рд┐рдпрд▓ рд░ рдХреЛрдб рдЬрд╛рдБрдЪ рдЧрд░реНрди рдиреЛрдЯрдкреНрдпрд╛рдбрдорд╛ рдЕрднреНрдпрд╛рд╕ рдЧрд░реНрдиреБрд╣реЛрд╕реНред рдпрд╕ рдкрдЫрд┐ рд╣рд╛рдореА рдХрд╕реНрдЯрдо ParDo рдкреНрд░рдХрд╛рд░реНрдп рдкрд░рд┐рднрд╛рд╖рд┐рдд рдЧрд░реНрдЫреМрдВ рд╡рд┐рднрд╛рдЬрди, рдЬреБрди рд╕рдорд╛рдирд╛рдиреНрддрд░ рдкреНрд░рд╢реЛрдзрдирдХреЛ рд▓рд╛рдЧрд┐ рдмреАрдо рд░реВрдкрд╛рдиреНрддрд░рдгрдХреЛ рднрд┐рдиреНрдирддрд╛ рд╣реЛред рдкрд╛рдЗрдердирдорд╛, рдпреЛ рдПрдХ рд╡рд┐рд╢реЗрд╖ рддрд░рд┐рдХрд╛рдорд╛ рдЧрд░рд┐рдиреНрдЫ - рд╣рд╛рдореАрд▓реЗ DoFn рдмреАрдо рд╡рд░реНрдЧрдмрд╛рдЯ тАЛтАЛрдЗрдирд╣реЗрд░рд┐рдЯ рдЧрд░реНрдиреЗ рдХрдХреНрд╖рд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреБрдкрд░реНрдЫред рд╕реНрдкреНрд▓рд┐рдЯ рдкреНрд░рдХрд╛рд░реНрдпрд▓реЗ рдЕрдШрд┐рд▓реНрд▓реЛ рдкреНрд░рдХрд╛рд░реНрдпрдмрд╛рдЯ рдкрд╛рд░реНрд╕ рдЧрд░рд┐рдПрдХреЛ рдкрдЩреНрдХреНрддрд┐ рд▓рд┐рдиреНрдЫ рд░ рд╣рд╛рдореНрд░реЛ BigQuery рддрд╛рд▓рд┐рдХрд╛рдорд╛ рд╕реНрддрдореНрдн рдирд╛рдорд╣рд░реВрд╕рдБрдЧ рд╕рдореНрдмрдиреНрдзрд┐рдд рдХреБрдЮреНрдЬреАрд╣рд░реВ рд╕рд╣рд┐рдд рд╢рдмреНрджрдХреЛрд╢рд╣рд░реВрдХреЛ рд╕реВрдЪреА рдлрд░реНрдХрд╛рдЙрдБрдЫред рдпрд╕ рдкреНрд░рдХрд╛рд░реНрдпрдХреЛ рдмрд╛рд░реЗрдорд╛ рдиреЛрдЯ рдЧрд░реНрди рдХреЗрд╣рд┐ рдЫ: рдореИрд▓реЗ рдЖрдпрд╛рдд рдЧрд░реНрдиреБрдкрд░реНтАНрдпреЛ datetime рдпрд╕рд▓рд╛рдИ рдХрд╛рдо рдЧрд░реНрди рдХреЛ рд▓рд╛рдЧреА рдПрдХ рдкреНрд░рдХрд╛рд░реНрдп рднрд┐рддреНрд░ред рдореИрд▓реЗ рдлрд╛рдЗрд▓рдХреЛ рд╕реБрд░реБрдорд╛ рдЖрдпрд╛рдд рддреНрд░реБрдЯрд┐ рдкреНрд░рд╛рдкреНрдд рдЧрд░рд┐рд░рд╣реЗрдХреЛ рдерд┐рдПрдБ, рдЬреБрди рдЕрдиреМрдареЛ рдерд┐рдпреЛред рдпреЛ рд╕реВрдЪреА рддреНрдпрд╕рдкрдЫрд┐ рд╕рдорд╛рд░реЛрд╣рдорд╛ рдкрдард╛рдЗрдиреНрдЫ BigQuery рд▓реЗрдЦреНрдиреБрд╣реЛрд╕реН, рдЬрд╕рд▓реЗ рд╣рд╛рдореНрд░реЛ рдбрд╛рдЯрд╛рд▓рд╛рдИ рдЯреЗрдмрд▓рдорд╛ рдердкреНрдЫред рдмреНрдпрд╛рдЪ DataFlow Job рд░ Streaming DataFlow Job рдХреЛ рд▓рд╛рдЧрд┐ рдХреЛрдб рддрд▓ рджрд┐рдЗрдПрдХреЛ рдЫред рдмреНрдпрд╛рдЪ рд░ рд╕реНрдЯреНрд░рд┐рдорд┐рдЩ рдХреЛрдб рдмреАрдЪрдХреЛ рднрд┐рдиреНрдирддрд╛ рднрдиреЗрдХреЛ рдмреНрдпрд╛рдЪрдорд╛ рд╣рд╛рдореАрд▓реЗ CSV рдмрд╛рдЯ рдкрдвреНрдЫреМрдВ src_pathрдкреНрд░рдХрд╛рд░реНрдп рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджреИ ReadFromText рдмреАрдо рдмрд╛рдЯред

рдмреНрдпрд╛рдЪ рдбрд╛рдЯрд╛рдлреНрд▓реЛ рдХрд╛рд░реНрдп (рдмреНрдпрд╛рдЪ рдкреНрд░рд╢реЛрдзрди)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

рд╕реНрдЯреНрд░рд┐рдорд┐рдЩ рдбрд╛рдЯрд╛рдлреНрд▓реЛ рдХрд╛рд░реНрдп (рд╕реНрдЯреНрд░рд┐рдо рдкреНрд░рд╢реЛрдзрди)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

рдХрдиреНрд╡реЗрдпрд░ рд╕реБрд░реБ рдЧрд░реНрджреИ

рд╣рд╛рдореА рд╡рд┐рднрд┐рдиреНрди рддрд░рд┐рдХрд╛рдорд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд╛рдЙрди рд╕рдХреНрдЫреМрдВред рдпрджрд┐ рд╣рд╛рдореАрд▓реЗ рдЪрд╛рд╣реЗрдХреЛ рдЦрдгреНрдбрдорд╛, рдЯрд╛рдврд╛рдмрд╛рдЯ GCP рдорд╛ рд▓рдЧ рдЗрди рдЧрд░реНрджрд╛ рд╣рд╛рдореАрд▓реЗ рдпрд╕рд▓рд╛рдИ рдЯрд░реНрдорд┐рдирд▓рдмрд╛рдЯ рд╕реНрдерд╛рдиреАрдп рд░реВрдкрдорд╛ рдЪрд▓рд╛рдЙрди рд╕рдХреНрдЫреМрдВред

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

рдпрджреНрдпрдкрд┐, рд╣рд╛рдореА рдпрд╕рд▓рд╛рдИ DataFlow рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдЪрд▓рд╛рдЙрдиреЗ рдЫреМрдВред рд╣рд╛рдореА рдирд┐рдореНрди рдЖрд╡рд╢реНрдпрдХ рдкреНрдпрд╛рд░рд╛рдорд┐рдЯрд░рд╣рд░реВ рд╕реЗрдЯ рдЧрд░реЗрд░ рддрд▓рдХреЛ рдЖрджреЗрд╢ рдкреНрд░рдпреЛрдЧ рдЧрд░реЗрд░ рдпреЛ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВред

  • project тАФ рддрдкрд╛рдИрдВрдХреЛ GCP рдкрд░рд┐рдпреЛрдЬрдирд╛рдХреЛ IDред
  • runner рдПрдХ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдзрд╛рд╡рдХ рд╣реЛ рдЬрд╕рд▓реЗ рддрдкрд╛рдИрдВрдХреЛ рдХрд╛рд░реНрдпрдХреНрд░рдордХреЛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдЧрд░реНрдиреЗрдЫ рд░ рддрдкрд╛рдИрдВрдХреЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдирд┐рд░реНрдорд╛рдг рдЧрд░реНрдиреЗрдЫред рдХреНрд▓рд╛рдЙрдбрдорд╛ рдЪрд▓рд╛рдЙрдирдХреЛ рд▓рд╛рдЧрд┐, рддрдкрд╛рдИрдВрд▓реЗ DataflowRunner рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдЧрд░реНрдиреБрдкрд░реНрдЫред
  • staging_location тАФ рдХрд╛рдо рдкреНрд░рджрд░реНрд╢рди рдЧрд░реНрдиреЗ рдкреНрд░реЛрд╕реЗрд╕рд░рд╣рд░реВрд▓рд╛рдИ рдЖрд╡рд╢реНрдпрдХ рдкрд░реНрдиреЗ рдХреЛрдб рдкреНрдпрд╛рдХреЗрдЬрд╣рд░реВ рдЕрдиреБрдХреНрд░рдордгрд┐рдХрд╛рдХреЛ рд▓рд╛рдЧрд┐ рдХреНрд▓рд╛рдЙрдб рдбрд╛рдЯрд╛рдлреНрд▓реЛ рдХреНрд▓рд╛рдЙрдб рднрдгреНрдбрд╛рд░рдгрдХреЛ рд▓рд╛рдЧрд┐ рдорд╛рд░реНрдЧред
  • temp_location - рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд┐рд░рд╣реЗрдХреЛ рдмреЗрд▓рд╛ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░рд┐рдПрдХрд╛ рдЕрд╕реНрдерд╛рдпреА рдХрд╛рдо рдлрд╛рдЗрд▓рд╣рд░реВ рднрдгреНрдбрд╛рд░рдг рдЧрд░реНрдирдХреЛ рд▓рд╛рдЧрд┐ рдХреНрд▓рд╛рдЙрдб рдбрд╛рдЯрд╛рдлреНрд▓реЛ рдХреНрд▓рд╛рдЙрдб рднрдгреНрдбрд╛рд░рдгрдХреЛ рд▓рд╛рдЧрд┐ рдорд╛рд░реНрдЧред
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

рдпреЛ рдЖрджреЗрд╢ рдЪрд▓рд┐рд░рд╣реЗрдХреЛ рдмреЗрд▓рд╛, рд╣рд╛рдореА Google рдХрдиреНрд╕реЛрд▓рдорд╛ DataFlow рдЯреНрдпрд╛рдмрдорд╛ рдЬрд╛рди рд╕рдХреНрдЫреМрдВ рд░ рд╣рд╛рдореНрд░реЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╣реЗрд░реНрди рд╕рдХреНрдЫреМрдВред рдЬрдм рд╣рд╛рдореА рдкрд╛рдЗрдкрд▓рд╛рдЗрдирдорд╛ рдХреНрд▓рд┐рдХ рдЧрд░реНрдЫреМрдВ, рд╣рд╛рдореАрд▓реЗ рдЪрд┐рддреНрд░ 4 рдЬрд╕реНрддреИ рдХреЗрд╣рд┐ рджреЗрдЦреНрдиреБрдкрд░реНрдЫред рдбрд┐рдмрдЧрд┐рдЩ рдЙрджреНрджреЗрд╢реНрдпрдХрд╛ рд▓рд╛рдЧрд┐, рд╡рд┐рд╕реНрддреГрдд рд▓рдЧрд╣рд░реВ рд╣реЗрд░реНрди рд▓рдЧрд╣рд░реВрдорд╛ рд░ рддреНрдпрд╕рдкрдЫрд┐ Stackdriver рдорд╛ рдЬрд╛рди рдзреЗрд░реИ рдЙрдкрдпреЛрдЧреА рд╣реБрди рд╕рдХреНрдЫред рдпрд╕рд▓реЗ рдорд▓рд╛рдИ рдзреЗрд░реИ рдХреЗрд╕рд╣рд░реВрдорд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рдорд╕реНрдпрд╛рд╣рд░реВ рд╕рдорд╛рдзрд╛рди рдЧрд░реНрди рдорджреНрджрдд рдЧрд░реЗрдХреЛ рдЫред

рд╣рд╛рдореА рд╕реНрдЯреНрд░рд┐рдо рдбрд╛рдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдЩ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВред рднрд╛рдЧ 2
рдЪрд┐рддреНрд░ 4: рдмреАрдо рдХрдиреНрд╡реЗрдпрд░

BigQuery рдорд╛ рд╣рд╛рдореНрд░реЛ рдбреЗрдЯрд╛ рдкрд╣реБрдБрдЪ рдЧрд░реНрдиреБрд╣реЛрд╕реН

рддреНрдпрд╕реЛрднрдП, рд╣рд╛рдореАрд╕рдБрдЧ рдкрд╣рд┐рд▓реЗ рдиреИ рд╣рд╛рдореНрд░реЛ рддрд╛рд▓рд┐рдХрд╛рдорд╛ рдбрд╛рдЯрд╛ рдкреНрд░рд╡рд╛рд╣ рднрдПрдХреЛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд▓рд┐рд░рд╣реЗрдХреЛ рд╣реБрдиреБрдкрд░реНрдЫред рдпреЛ рдкрд░реАрдХреНрд╖рдг рдЧрд░реНрди, рд╣рд╛рдореА BigQuery рдорд╛ рдЧрдПрд░ рдбреЗрдЯрд╛ рд╣реЗрд░реНрди рд╕рдХреНрдЫреМрдВред рддрд▓рдХреЛ рдЖрджреЗрд╢ рдкреНрд░рдпреЛрдЧ рдЧрд░рд┐рд╕рдХреЗрдкрдЫрд┐ рддрдкрд╛рдИрдВрд▓реЗ рдбреЗрдЯрд╛рд╕реЗрдЯрдХреЛ рдкрд╣рд┐рд▓реЛ рдХреЗрд╣реА рдкрдЩреНрдХреНрддрд┐рд╣рд░реВ рджреЗрдЦреНрдиреБрдкрд░реНрдЫред рдЕрдм рд╣рд╛рдореАрд╕рдБрдЧ BigQuery рдорд╛ рднрдгреНрдбрд╛рд░рдг рдЧрд░рд┐рдПрдХреЛ рдбреЗрдЯрд╛ рдЫ, рд╣рд╛рдореА рдердк рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ, рд╕рд╛рдереИ рдбреЗрдЯрд╛ рд╕рд╣рдХрд░реНрдореАрд╣рд░реВрд╕рдБрдЧ рд╕рд╛рдЭреЗрджрд╛рд░реА рдЧрд░реНрди рд░ рд╡реНрдпрд╡рд╕рд╛рдпрд┐рдХ рдкреНрд░рд╢реНрдирд╣рд░реВрдХреЛ рдЬрд╡рд╛рдл рджрд┐рди рдерд╛рд▓реНрдЫреМрдВред

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

рд╣рд╛рдореА рд╕реНрдЯреНрд░рд┐рдо рдбрд╛рдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдЩ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдЫреМрдВред рднрд╛рдЧ 2
рдЪрд┐рддреНрд░ рел: BigQuery

рдирд┐рд╖реНрдХрд░реНрд╖рдорд╛

рд╣рд╛рдореА рдЖрд╢рд╛ рдЧрд░реНрджрдЫреМрдВ рдХрд┐ рдпреЛ рдкреЛрд╖реНрдЯрд▓реЗ рд╕реНрдЯреНрд░рд┐рдорд┐рдЩ рдбреЗрдЯрд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрдиреЗ, рд╕рд╛рдереИ рдбреЗрдЯрд╛рд▓рд╛рдИ рдердк рдкрд╣реБрдБрдЪрдпреЛрдЧреНрдп рдмрдирд╛рдЙрдиреЗ рддрд░рд┐рдХрд╛рд╣рд░реВ рдЦреЛрдЬреНрдиреЗ рдЙрдкрдпреЛрдЧреА рдЙрджрд╛рд╣рд░рдгрдХреЛ рд░реВрдкрдорд╛ рдХрд╛рдо рдЧрд░реНрджрдЫред рдпрд╕ рдврд╛рдБрдЪрд╛рдорд╛ рдбрд╛рдЯрд╛ рднрдгреНрдбрд╛рд░рдгрд▓реЗ рд╣рд╛рдореАрд▓рд╛рдИ рдзреЗрд░реИ рдлрд╛рдЗрджрд╛рд╣рд░реВ рджрд┐рдиреНрдЫред рдЕрдм рд╣рд╛рдореА рдорд╣рддреНрддреНрд╡рдкреВрд░реНрдг рдкреНрд░рд╢реНрдирд╣рд░реВрдХреЛ рдЬрд╡рд╛рдл рджрд┐рди рд╕реБрд░реБ рдЧрд░реНрди рд╕рдХреНрдЫреМрдВ рдЬрд╕реНрддреИ рдХрддрд┐ рдорд╛рдирд┐рд╕рд╣рд░реВрд▓реЗ рд╣рд╛рдореНрд░реЛ рдЙрддреНрдкрд╛рджрди рдкреНрд░рдпреЛрдЧ рдЧрд░реНрдЫрдиреН? рдХреЗ рддрдкрд╛рдЗрдБрдХреЛ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рдЖрдзрд╛рд░ рд╕рдордп рд╕рдВрдЧ рдмрдвреНрджреИ рдЫ? рдЙрддреНрдкрд╛рджрдирдХрд╛ рдХреБрди рдкрдХреНрд╖рд╣рд░реВрд╕рдБрдЧ рдорд╛рдирд┐рд╕рд╣рд░реВрд▓реЗ рд╕рдмреИрднрдиреНрджрд╛ рдмрдвреА рдЕрдиреНрддрд░рдХреНрд░рд┐рдпрд╛ рдЧрд░реНрдЫрдиреН? рд░ рддреНрдпрд╣рд╛рдБ рддреНрд░реБрдЯрд┐рд╣рд░реВ рдЫрдиреН рдЬрд╣рд╛рдБ рд╣реБрдиреБ рд╣реБрдБрджреИрди? рдпреА рдкреНрд░рд╢реНрдирд╣рд░реВ рд╣реБрдиреН рдЬреБрди рд╕рдВрд╕реНрдерд╛рдХреЛ рдЪрд╛рд╕реЛрдХреЛ рд╡рд┐рд╖рдп рд╣реБрдиреЗрдЫред рдпреА рдкреНрд░рд╢реНрдирд╣рд░реВрдХреЛ рдЬрд╡рд╛рдлрдмрд╛рдЯ рдЙрддреНрдкрдиреНрди рд╣реБрдиреЗ рдЕрдиреНрддрд░рджреГрд╖реНрдЯрд┐рд╣рд░реВрдХреЛ рдЖрдзрд╛рд░рдорд╛, рд╣рд╛рдореА рдЙрддреНрдкрд╛рджрди рд╕реБрдзрд╛рд░ рдЧрд░реНрди рд░ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рд╕рдВрд▓рдЧреНрдирддрд╛ рдмрдврд╛рдЙрди рд╕рдХреНрдЫреМрдВред

рдмреАрдо рдпрд╕ рдкреНрд░рдХрд╛рд░рдХреЛ рд╡реНрдпрд╛рдпрд╛рдордХреЛ рд▓рд╛рдЧрд┐ рд╕рд╛рдБрдЪреНрдЪреИ рдЙрдкрдпреЛрдЧреА рдЫ рд░ рд╕рд╛рдереИ рдЕрдиреНрдп рд░реЛрдЪрдХ рдкреНрд░рдпреЛрдЧ рдХреЗрд╕рд╣рд░реВ рдкрдирд┐ рдЫрдиреНред рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐, рддрдкрд╛рдИрдВ рд╡рд╛рд╕реНрддрд╡рд┐рдХ рд╕рдордпрдорд╛ рд╕реНрдЯрдХ рдЯрд┐рдХ рдбрд╛рдЯрд╛рдХреЛ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдЧрд░реНрди рд░ рд╡рд┐рд╢реНрд▓реЗрд╖рдгрдорд╛ рдЖрдзрд╛рд░рд┐рдд рдЯреНрд░реЗрдбрд╣рд░реВ рдЧрд░реНрди рдЪрд╛рд╣рдиреБрд╣реБрдиреНрдЫ, рд╕рд╛рдпрдж рддрдкрд╛рдИрдВрд╕рдБрдЧ рд╕рд╡рд╛рд░реА рд╕рд╛рдзрдирд╣рд░реВрдмрд╛рдЯ рдЖрдЙрдиреЗ рд╕реЗрдиреНрд╕рд░ рдбреЗрдЯрд╛ рдЫ рд░ рдЯреНрд░рд╛рдлрд┐рдХ рд╕реНрддрд░ рдЧрдгрдирд╛рд╣рд░реВ рдЧрдгрдирд╛ рдЧрд░реНрди рдЪрд╛рд╣рдиреБрд╣реБрдиреНрдЫред рддрдкрд╛рдИрдВ рдкрдирд┐, рдЙрджрд╛рд╣рд░рдгрдХрд╛ рд▓рд╛рдЧрд┐, рдПрдХ рдЧреЗрдорд┐рдЩ рдХрдореНрдкрдиреА рд╣реБрди рд╕рдХреНрдиреБрд╣реБрдиреНрдЫ рдЬрд╕рд▓реЗ рдкреНрд░рдпреЛрдЧрдХрд░реНрддрд╛ рдбреЗрдЯрд╛ рд╕рдЩреНрдХрд▓рди рдЧрд░реНрджрдЫ рд░ рдореБрдЦреНрдп рдореЗрдЯреНрд░рд┐рдХреНрд╕ рдЯреНрд░реНрдпрд╛рдХ рдЧрд░реНрди рдбреНрдпрд╛рд╕рдмреЛрд░реНрдбрд╣рд░реВ рд╕рд┐рд░реНрдЬрдирд╛ рдЧрд░реНрди рдкреНрд░рдпреЛрдЧ рдЧрд░реНрджрдЫред рдареАрдХ рдЫ, рд╕рдЬреНрдЬрдирд╣рд░реВ, рдпреЛ рдЕрд░реНрдХреЛ рдкреЛрд╖реНрдЯрдХреЛ рд▓рд╛рдЧрд┐ рд╡рд┐рд╖рдп рд╣реЛ, рдкрдвреНрдирдХреЛ рд▓рд╛рдЧрд┐ рдзрдиреНрдпрд╡рд╛рдж, рд░ рдкреВрд░реНрдг рдХреЛрдб рд╣реЗрд░реНрди рдЪрд╛рд╣рдиреЗрд╣рд░реВрдХрд╛ рд▓рд╛рдЧрд┐, рддрд▓ рдореЗрд░реЛ GitHub рдХреЛ рд▓рд┐рдЩреНрдХ рдЫред

https://github.com/DFoly/User_log_pipeline

рдпрддрд┐ рдиреИред рднрд╛рдЧ рдПрдХ рдкрдвреНрдиреБрд╣реЛрд╕реН.

рд╕реНрд░реЛрдд: www.habr.com

рдПрдХ рдЯрд┐рдкреНрдкрдгреА рдердкреНрди