рдЖрдореНрд╣реА рд╕реНрдЯреНрд░реАрдо рдбреЗрдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рддреЛ. рднрд╛рдЧ 2

рд╕рд░реНрд╡рд╛рдВрдирд╛ рдирдорд╕реНрдХрд╛рд░. рдЖрдореНрд╣реА рд▓реЗрдЦрд╛рдЪреНрдпрд╛ рд╢реЗрд╡рдЯрдЪреНрдпрд╛ рднрд╛рдЧрд╛рдЪреЗ рднрд╛рд╖рд╛рдВрддрд░ рд╢реЗрдЕрд░ рдХрд░рдд рдЖрд╣реЛрдд, рдЬреЗ рд╡рд┐рд╢реЗрд╖рддрдГ рдЕрднреНрдпрд╛рд╕рдХреНрд░рдорд╛рдЪреНрдпрд╛ рд╡рд┐рджреНрдпрд╛рд░реНрдереНрдпрд╛рдВрд╕рд╛рдареА рддрдпрд╛рд░ рдХреЗрд▓реЗ рдЖрд╣реЗ. рдбреЗрдЯрд╛ рдЕрднрд┐рдпрдВрддрд╛. рдкрд╣рд┐рд▓рд╛ рднрд╛рдЧ рддреБрдореНрд╣реА рд╡рд╛рдЪреВ рд╢рдХрддрд╛ рдпреЗрдереЗ.

рд░рд┐рдЕрд▓-рдЯрд╛рдЗрдо рдкрд╛рдЗрдкрд▓рд╛рдЗрдирд╕рд╛рдареА рдЕрдкрд╛рдЪреЗ рдмреАрдо рдЖрдгрд┐ рдбреЗрдЯрд╛рдлреНрд▓реЛ

рдЖрдореНрд╣реА рд╕реНрдЯреНрд░реАрдо рдбреЗрдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рддреЛ. рднрд╛рдЧ 2

Google рдХреНрд▓рд╛рдЙрдб рд╕реЗрдЯ рдХрд░рдд рдЖрд╣реЗ

рдЯреАрдк: рдореА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд╛рд▓рд╡рдгреНрдпрд╛рд╕рд╛рдареА рдЖрдгрд┐ рд╕рд╛рдиреБрдХреВрд▓ рд▓реЙрдЧ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд╢рд┐рдд рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓рдЪрд╛ рд╡рд╛рдкрд░ рдХреЗрд▓рд╛ рдХрд╛рд░рдг рдорд▓рд╛ рдкрд╛рдпрдерди 3 рдордзреНрдпреЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд╛рд▓рд╡рддрд╛рдирд╛ рд╕рдорд╕реНрдпрд╛ рдпреЗрдд рд╣реЛрддреА. Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓ рдкрд╛рдпрдерди 2 рд╡рд╛рдкрд░рддреЗ, рдЬреЗ Apache Beam рд╢реА рдЕрдзрд┐рдХ рд╕реБрд╕рдВрдЧрдд рдЖрд╣реЗ.

рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕реБрд░реВ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдЖрдореНрд╣рд╛рд▓рд╛ рд╕реЗрдЯрд┐рдВрдЧреНрдЬрдордзреНрдпреЗ рдереЛрдбреЗрд╕реЗ рдЦреЛрджрдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ. рддреБрдордЪреНрдпрд╛рдкреИрдХреА рдЬреНрдпрд╛рдВрдиреА рдЖрдзреА GCP рд╡рд╛рдкрд░рд▓рд╛ рдирд╛рд╣реА рддреНрдпрд╛рдВрдЪреНрдпрд╛рд╕рд╛рдареА, рддреБрдореНрд╣рд╛рд▓рд╛ рдпрд╛рдордзреНрдпреЗ рд╡рд░реНрдгрди рдХреЗрд▓реЗрд▓реНрдпрд╛ рдЦрд╛рд▓реАрд▓ 6 рдкрд╛рдпрд▒реНрдпрд╛ рдлреЙрд▓реЛ рдХрд░рд╛рд╡реНрдпрд╛ рд▓рд╛рдЧрддреАрд▓ рдкреГрд╖реНрда.

рдпрд╛рдирдВрддрд░, рдЖрдореНрд╣рд╛рд▓рд╛ рдЖрдордЪреНрдпрд╛ рд╕реНрдХреНрд░рд┐рдкреНрдЯреНрд╕ Google Cloud Storage рд╡рд░ рдЕрдкрд▓реЛрдб рдХрд░рд╛рд╡реНрдпрд╛ рд▓рд╛рдЧрддреАрд▓ рдЖрдгрд┐ рддреНрдпрд╛ рдЖрдордЪреНрдпрд╛ Google Cloud Shel рд╡рд░ рдХреЙрдкреА рдХрд░рд╛рд╡реНрдпрд╛ рд▓рд╛рдЧрддреАрд▓. рдХреНрд▓рд╛рдЙрдб рд╕реНрдЯреЛрд░реЗрдЬрд╡рд░ рдЕрдкрд▓реЛрдб рдХрд░рдгреЗ рдЕрдЧрджреА рдХреНрд╖реБрд▓реНрд▓рдХ рдЖрд╣реЗ (рд╡рд░реНрдгрди рд╕рд╛рдкрдбреВ рд╢рдХрддреЗ рдпреЗрдереЗ). рдЖрдордЪреНрдпрд╛ рдлрд╛рдпрд▓реА рдХреЙрдкреА рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдЖрдореНрд╣реА рдЦрд╛рд▓реАрд▓ рдЖрдХреГрддреА 2 рдордзреАрд▓ рдбрд╛рд╡реАрдХрдбреАрд▓ рдкрд╣рд┐рд▓реНрдпрд╛ рдЪрд┐рдиреНрд╣рд╛рд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░реВрди рдЯреВрд▓рдмрд╛рд░рд╡рд░реВрди Google рдХреНрд▓рд╛рдЙрдб рд╢реЗрд▓ рдЙрдШрдбреВ рд╢рдХрддреЛ.

рдЖрдореНрд╣реА рд╕реНрдЯреНрд░реАрдо рдбреЗрдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рддреЛ. рднрд╛рдЧ 2
2 рдЖрдХреГрддреА

рдлрд╛рдпрд▓реА рдХреЙрдкреА рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЖрдгрд┐ рдЖрд╡рд╢реНрдпрдХ рд▓рд╛рдпрдмреНрд░рд░реА рд╕реНрдерд╛рдкрд┐рдд рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЖрдореНрд╣рд╛рд▓рд╛ рдЖрд╡рд╢реНрдпрдХ рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдХрдорд╛рдВрдб рдЦрд╛рд▓реА рд╕реВрдЪреАрдмрджреНрдз рдЖрд╣реЗрдд.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

рдЖрдордЪрд╛ рдбреЗрдЯрд╛рдмреЗрд╕ рдЖрдгрд┐ рдЯреЗрдмрд▓ рддрдпрд╛рд░ рдХрд░рдд рдЖрд╣реЗ

рдПрдХрджрд╛ рдЖрдореНрд╣реА рд╕реЗрдЯрдЕрдк рд╕рдВрдмрдВрдзрд┐рдд рд╕рд░реНрд╡ рдкрд╛рдпрд▒реНрдпрд╛ рдкреВрд░реНрдг рдХреЗрд▓реНрдпрд╛рд╡рд░, рдЖрдореНрд╣рд╛рд▓рд╛ рдкреБрдвреАрд▓ рдЧреЛрд╖реНрдЯ рдХрд░рд╛рдпрдЪреА рдЖрд╣реЗ рддреА рдореНрд╣рдгрдЬреЗ BigQuery рдордзреНрдпреЗ рдбреЗрдЯрд╛рд╕реЗрдЯ рдЖрдгрд┐ рдЯреЗрдмрд▓ рддрдпрд╛рд░ рдХрд░рдгреЗ. рд╣реЗ рдХрд░рдгреНрдпрд╛рдЪреЗ рдЕрдиреЗрдХ рдорд╛рд░реНрдЧ рдЖрд╣реЗрдд, рдкрд░рдВрддреБ рд╕рд░реНрд╡рд╛рдд рд╕реЛрдкрд╛ рдореНрд╣рдгрдЬреЗ рдкреНрд░рдердо рдбреЗрдЯрд╛рд╕реЗрдЯ рддрдпрд╛рд░ рдХрд░реВрди Google рдХреНрд▓рд╛рдЙрдб рдХрдиреНрд╕реЛрд▓ рд╡рд╛рдкрд░рдгреЗ. рддреБрдореНрд╣реА рдЦрд╛рд▓реАрд▓ рдкрд╛рдпрд▒реНрдпрд╛ рдлреЙрд▓реЛ рдХрд░реВ рд╢рдХрддрд╛ рджреБрд╡рд╛рд╕реНрдХреАрдорд╛рд╕рд╣ рдЯреЗрдмрд▓ рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА. рдЖрдордЪреНрдпрд╛ рдЯреЗрдмрд▓ рдЕрд╕реЗрд▓ 7 рд╕реНрддрдВрдн, рдкреНрд░рддреНрдпреЗрдХ рд╡рд╛рдкрд░рдХрд░реНрддрд╛ рд▓реЙрдЧрдЪреНрдпрд╛ рдШрдЯрдХрд╛рдВрд╢реА рд╕рдВрдмрдВрдзрд┐рдд. рд╕реЛрдпреАрд╕рд╛рдареА, рдЖрдореНрд╣реА рдЯрд╛рдЗрдорд▓реЛрдХрд▓ рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓ рд╡рдЧрд│рддрд╛ рд╕рд░реНрд╡ рдХреЙрд▓рдореНрд╕ рд╕реНрдЯреНрд░рд┐рдВрдЧреНрд╕ рдореНрд╣рдгреВрди рдкрд░рд┐рднрд╛рд╖рд┐рдд рдХрд░реВ рдЖрдгрд┐ рдЖрдореНрд╣реА рдЖрдзреА рддрдпрд╛рд░ рдХреЗрд▓реЗрд▓реНрдпрд╛ рд╡реНрд╣реЗрд░рд┐рдПрдмрд▓реНрд╕рдиреБрд╕рд╛рд░ рддреНрдпрд╛рдВрдЪреА рдирд╛рд╡реЗ рджреЗрдК. рдЖрдордЪреНрдпрд╛ рд╕рд╛рд░рдгреАрдЪрд╛ рд▓реЗрдЖрдЙрдЯ рдЖрдХреГрддреА 3 рдкреНрд░рдорд╛рдгреЗ рджрд┐рд╕рд▓рд╛ рдкрд╛рд╣рд┐рдЬреЗ.

рдЖрдореНрд╣реА рд╕реНрдЯреНрд░реАрдо рдбреЗрдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рддреЛ. рднрд╛рдЧ 2
рдЖрдХреГрддреА 3. рдЯреЗрдмрд▓ рд▓реЗрдЖрдЙрдЯ

рд╡рд╛рдкрд░рдХрд░реНрддрд╛ рд▓реЙрдЧ рдбреЗрдЯрд╛ рдкреНрд░рдХрд╛рд╢рд┐рдд рдХрд░рдд рдЖрд╣реЗ

Pub/Sub рд╣рд╛ рдЖрдордЪреНрдпрд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрдирдЪрд╛ рдПрдХ рдорд╣рддреНрддреНрд╡рд╛рдЪрд╛ рдШрдЯрдХ рдЖрд╣реЗ рдХрд╛рд░рдг рддреЗ рдПрдХрд╛рдзрд┐рдХ рд╕реНрд╡рддрдВрддреНрд░ рдЕрдиреБрдкреНрд░рдпреЛрдЧрд╛рдВрдирд╛ рдПрдХрдореЗрдХрд╛рдВрд╢реА рд╕рдВрд╡рд╛рдж рд╕рд╛рдзрдгреНрдпрд╛рдЪреА рдкрд░рд╡рд╛рдирдЧреА рджреЗрддреЗ. рд╡рд┐рд╢реЗрд╖рддрдГ, рддреЗ рдПрдХ рдордзреНрдпрд╕реНрде рдореНрд╣рдгреВрди рдХрд╛рд░реНрдп рдХрд░рддреЗ рдЬреЗ рдЖрдореНрд╣рд╛рд▓рд╛ рдЕрдиреБрдкреНрд░рдпреЛрдЧрд╛рдВрдордзреНрдпреЗ рд╕рдВрджреЗрд╢ рдкрд╛рдард╡рд┐рдгреНрдпрд╛рд╕ рдЖрдгрд┐ рдкреНрд░рд╛рдкреНрдд рдХрд░рдгреНрдпрд╛рд╕ рдЕрдиреБрдорддреА рджреЗрддреЗ. рдкреНрд░рдердо рдЖрдкрд▓реНрдпрд╛рд▓рд╛ рдПрдХ рд╡рд┐рд╖рдп рддрдпрд╛рд░ рдХрд░рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ. рдХрдиреНрд╕реЛрд▓рдордзреНрдпреЗ рдлрдХреНрдд Pub/Sub рд╡рд░ рдЬрд╛ рдЖрдгрд┐ CREATE TOPIC рд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░рд╛.

рд╡рд░ рдкрд░рд┐рднрд╛рд╖рд┐рдд рдХреЗрд▓реЗрд▓рд╛ рд▓реЙрдЧ рдбреЗрдЯрд╛ рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдЦрд╛рд▓реАрд▓ рдХреЛрдб рдЖрдордЪреНрдпрд╛ рд╕реНрдХреНрд░рд┐рдкреНрдЯрд▓рд╛ рдХреЙрд▓ рдХрд░рддреЛ рдЖрдгрд┐ рдирдВрддрд░ рд▓реЙрдЧ рдЬреЛрдбрддреЛ рдЖрдгрд┐ Pub/Sub рд▓рд╛ рдкрд╛рдард╡рддреЛ. рдЖрдкрд▓реНрдпрд╛рд▓рд╛ рдлрдХреНрдд рдПрдХ рд╡рд╕реНрддреВ рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рдЪреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдЖрд╣реЗ PublisherClient, рдкрджреНрдзрдд рд╡рд╛рдкрд░реВрди рд╡рд┐рд╖рдпрд╛рдЪрд╛ рдорд╛рд░реНрдЧ рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рд╛ topic_path рдЖрдгрд┐ рдлрдВрдХреНрд╢рдирд▓рд╛ рдХреЙрд▓ рдХрд░рд╛ publish ╤Б topic_path рдЖрдгрд┐ рдбреЗрдЯрд╛. рдХреГрдкрдпрд╛ рд▓рдХреНрд╖рд╛рдд рдареЗрд╡рд╛ рдХреА рдЖрдореНрд╣реА рдЖрдпрд╛рдд рдХрд░рддреЛ generate_log_line рдЖрдордЪреНрдпрд╛ рд╕реНрдХреНрд░рд┐рдкреНрдЯрдордзреВрди stream_logs, рддреНрдпрд╛рдореБрд│реЗ рдпрд╛ рдлрд╛рдЗрд▓реНрд╕ рдПрдХрд╛рдЪ рдлреЛрд▓реНрдбрд░рдордзреНрдпреЗ рдЕрд╕рд▓реНрдпрд╛рдЪреА рдЦрд╛рддреНрд░реА рдХрд░рд╛, рдЕрдиреНрдпрдерд╛ рддреБрдореНрд╣рд╛рд▓рд╛ рдЖрдпрд╛рдд рддреНрд░реБрдЯреА рдорд┐рд│реЗрд▓. рддреНрдпрд╛рдирдВрддрд░ рдЖрдореНрд╣реА рд╣реЗ рд╡рд╛рдкрд░реВрди рдЖрдордЪреНрдпрд╛ Google рдХрдиреНрд╕реЛрд▓рджреНрд╡рд╛рд░реЗ рдЪрд╛рд▓рд╡реВ рд╢рдХрддреЛ:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

рдлрд╛рдЗрд▓ рдЪрд╛рд▓реВ рд╣реЛрддрд╛рдЪ, рдЖрдореНрд╣реА рдЦрд╛рд▓реАрд▓ рдЖрдХреГрддреАрдордзреНрдпреЗ рджрд░реНрд╢рд╡рд┐рд▓реНрдпрд╛рдкреНрд░рдорд╛рдгреЗ, рдХрдиреНрд╕реЛрд▓рд╡рд░ рд▓реЙрдЧ рдбреЗрдЯрд╛рдЪреЗ рдЖрдЙрдЯрдкреБрдЯ рдкрд╛рд╣рдгреНрдпрд╛рд╕ рд╕рдХреНрд╖рдо рд╣реЛрдК. рдЬреЛрдкрд░реНрдпрдВрдд рдЖрдореНрд╣реА рд╡рд╛рдкрд░рдд рдирд╛рд╣реА рддреЛрдкрд░реНрдпрдВрдд рд╣реА рд╕реНрдХреНрд░рд┐рдкреНрдЯ рдЪрд╛рд▓реЗрд▓ CTRL + рд╕реАрддреЗ рдкреВрд░реНрдг рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА.

рдЖрдореНрд╣реА рд╕реНрдЯреНрд░реАрдо рдбреЗрдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рддреЛ. рднрд╛рдЧ 2
рдЖрдХреГрддреА 4. рдЖрдЙрдЯрдкреБрдЯ publish_logs.py

рдЖрдордЪрд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдХреЛрдб рд▓рд┐рд╣рд┐рдд рдЖрд╣реЗ

рдЖрддрд╛ рдЖрдореНрд╣реА рд╕рд░реНрд╡рдХрд╛рд╣реА рддрдпрд╛рд░ рдХреЗрд▓реЗ рдЖрд╣реЗ, рдЖрдореНрд╣реА рдордЬреЗрджрд╛рд░ рднрд╛рдЧ рд╕реБрд░реВ рдХрд░реВ рд╢рдХрддреЛ - рдмреАрдо рдЖрдгрд┐ рдкрд╛рдпрдерди рд╡рд╛рдкрд░реВрди рдЖрдордЪреНрдпрд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрдирдЪреЗ рдХреЛрдбрд┐рдВрдЧ. рдмреАрдо рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдЖрдореНрд╣рд╛рд▓рд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдСрдмреНрдЬреЗрдХреНрдЯ (рдкреА) рддрдпрд╛рд░ рдХрд░рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ. рдПрдХрджрд╛ рдЖрдкрдг рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдСрдмреНрдЬреЗрдХреНрдЯ рддрдпрд╛рд░ рдХреЗрд▓реНрдпрд╛рд╡рд░, рдЖрдкрдг рдСрдкрд░реЗрдЯрд░ рд╡рд╛рдкрд░реВрди рдПрдХрд╛рдорд╛рдЧреВрди рдПрдХ рдЕрдиреЗрдХ рдлрдВрдХреНрд╢рдиреНрд╕ рд▓рд╛рдЧреВ рдХрд░реВ рд╢рдХрддреЛ pipe (|). рд╕рд░реНрд╡рд╕рд╛рдзрд╛рд░рдгрдкрдгреЗ, рд╡рд░реНрдХрдлреНрд▓реЛ рдЦрд╛рд▓реАрд▓ рдкреНрд░рддрд┐рдореЗрдкреНрд░рдорд╛рдгреЗ рджрд┐рд╕рддреЗ.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

рдЖрдордЪреНрдпрд╛ рдХреЛрдбрдордзреНрдпреЗ, рдЖрдореНрд╣реА рджреЛрди рдХрд╕реНрдЯрдо рдлрдВрдХреНрд╢рдиреНрд╕ рддрдпрд╛рд░ рдХрд░реВ. рдХрд╛рд░реНрдп regex_clean, рдЬреЗ рдбреЗрдЯрд╛ рд╕реНрдХреЕрди рдХрд░рддреЗ рдЖрдгрд┐ рдлрдВрдХреНрд╢рди рд╡рд╛рдкрд░реВрди PATTERNS рд╕реВрдЪреАрд╡рд░ рдЖрдзрд╛рд░рд┐рдд рд╕рдВрдмрдВрдзрд┐рдд рдкрдВрдХреНрддреА рдкреБрдирд░реНрдкреНрд░рд╛рдкреНрдд рдХрд░рддреЗ re.search. рдлрдВрдХреНрд╢рди рд╕реНрд╡рд▓реНрдкрд╡рд┐рд░рд╛рдорд╛рдиреЗ рд╡рд┐рднрдХреНрдд рдХреЗрд▓реЗрд▓реА рд╕реНрдЯреНрд░рд┐рдВрдЧ рдорд┐рд│рд╡рддреЗ. рддреБрдореНрд╣реА рдирд┐рдпрдорд┐рдд рдЕрднрд┐рд╡реНрдпрдХреНрддреА рддрдЬреНрдЮ рдирд╕рд▓реНрдпрд╛рд╕, рдореА рд╣реЗ рддрдкрд╛рд╕рдгреНрдпрд╛рдЪреА рд╢рд┐рдлрд╛рд░рд╕ рдХрд░рддреЛ рдЯреНрдпреВрдЯреЛрд░рд┐рдпрд▓ рдЖрдгрд┐ рдХреЛрдб рддрдкрд╛рд╕рдгреНрдпрд╛рд╕рд╛рдареА рдиреЛрдЯрдкреЕрдбрдордзреНрдпреЗ рд╕рд░рд╛рд╡ рдХрд░рд╛. рдпрд╛рдирдВрддрд░ рдЖрдореНрд╣реА рдХрд╕реНрдЯрдо ParDo рдлрдВрдХреНрд╢рди рдкрд░рд┐рднрд╛рд╖рд┐рдд рдХрд░рддреЛ рд╕реНрдкреНрд▓рд┐рдЯ, рдЬреА рд╕рдорд╛рдВрддрд░ рдкреНрд░рдХреНрд░рд┐рдпреЗрд╕рд╛рдареА рдмреАрдо рдЯреНрд░рд╛рдиреНрд╕рдлреЙрд░реНрдордЪреА рднрд┐рдиреНрдирддрд╛ рдЖрд╣реЗ. рдкрд╛рдпрдердирдордзреНрдпреЗ, рд╣реЗ рдПрдХрд╛ рд╡рд┐рд╢реЗрд╖ рдкреНрд░рдХрд╛рд░реЗ рдХреЗрд▓реЗ рдЬрд╛рддреЗ - рдЖрдкрдг DoFn рдмреАрдо рд╡рд░реНрдЧрд╛рдХрдбреВрди рд╡рд╛рд░рд╕рд╛ рдорд┐рд│рд╛рд▓реЗрд▓рд╛ рд╡рд░реНрдЧ рддрдпрд╛рд░ рдХреЗрд▓рд╛ рдкрд╛рд╣рд┐рдЬреЗ. рд╕реНрдкреНрд▓рд┐рдЯ рдлрдВрдХреНрд╢рди рдорд╛рдЧреАрд▓ рдлрдВрдХреНрд╢рдирдордзреВрди рдкрд╛рд░реНрд╕ рдХреЗрд▓реЗрд▓реА рдкрдВрдХреНрддреА рдШреЗрддреЗ рдЖрдгрд┐ рдЖрдордЪреНрдпрд╛ BigQuery рдЯреЗрдмрд▓рдордзреАрд▓ рд╕реНрддрдВрдн рдирд╛рд╡рд╛рдВрд╢реА рд╕рдВрдмрдВрдзрд┐рдд рдХреА рд╕рд╣ рд╢рдмреНрджрдХреЛрд╢рд╛рдВрдЪреА рд╕реВрдЪреА рдорд┐рд│рд╡рддреЗ. рдпрд╛ рдХрд╛рд░реНрдпрд╛рдмрджреНрджрд▓ рд▓рдХреНрд╖рд╛рдд рдШреЗрдгреНрдпрд╛рд╕рд╛рд░рдЦреЗ рдХрд╛рд╣реАрддрд░реА рдЖрд╣реЗ: рдорд▓рд╛ рдЖрдпрд╛рдд рдХрд░рд╛рд╡реЗ рд▓рд╛рдЧрд▓реЗ datetime рдХрд╛рд░реНрдп рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдлрдВрдХреНрд╢рдирдЪреНрдпрд╛ рдЖрдд. рдлрд╛рдИрд▓рдЪреНрдпрд╛ рд╕реБрд░реБрд╡рд╛рддреАрд▓рд╛ рдорд▓рд╛ рдЖрдпрд╛рдд рддреНрд░реБрдЯреА рдкреНрд░рд╛рдкреНрдд рд╣реЛрдд рд╣реЛрддреА, рдЬреА рд╡рд┐рдЪрд┐рддреНрд░ рд╣реЛрддреА. рд╣реА рдпрд╛рджреА рдирдВрддрд░ рдлрдВрдХреНрд╢рдирд▓рд╛ рджрд┐рд▓реА рдЬрд╛рддреЗ ToBigQuery рд▓рд┐рд╣рд╛, рдЬреЗ рдлрдХреНрдд рдЖрдордЪрд╛ рдбреЗрдЯрд╛ рдЯреЗрдмрд▓рдордзреНрдпреЗ рдЬреЛрдбрддреЗ. рдмреЕрдЪ рдбреЗрдЯрд╛рдлреНрд▓реЛ рдЬреЙрдм рдЖрдгрд┐ рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдбреЗрдЯрд╛рдлреНрд▓реЛ рдЬреЙрдмрд╕рд╛рдареА рдХреЛрдб рдЦрд╛рд▓реА рджрд┐рд▓реЗрд▓рд╛ рдЖрд╣реЗ. рдмреЕрдЪ рдЖрдгрд┐ рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдХреЛрдбрдордзреАрд▓ рдлрд░рдХ рдПрд╡рдврд╛рдЪ рдЖрд╣реЗ рдХреА рдмреЕрдЪрдордзреНрдпреЗ рдЖрдкрдг CSV рд╡рд░реВрди рд╡рд╛рдЪрддреЛ src_pathрдлрдВрдХреНрд╢рди рд╡рд╛рдкрд░реВрди ReadFromText рдмреАрдо рдкрд╛рд╕реВрди.

рдмреЕрдЪ рдбреЗрдЯрд╛рдлреНрд▓реЛ рдЬреЙрдм (рдмреЕрдЪ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдбреЗрдЯрд╛рдлреНрд▓реЛ рдЬреЙрдм (рд╕реНрдЯреНрд░реАрдо рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

рдХрдиреНрд╡реНрд╣реЗрдпрд░ рд╕реБрд░реВ рдХрд░рдд рдЖрд╣реЗ

рдЖрдореНрд╣реА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╡реЗрдЧрд╡реЗрдЧрд│реНрдпрд╛ рдкреНрд░рдХрд╛рд░реЗ рдЪрд╛рд▓рд╡реВ рд╢рдХрддреЛ. рдЖрдореНрд╣рд╛рд▓рд╛ рд╣рд╡реЗ рдЕрд╕рд▓реНрдпрд╛рд╕, рдЖрдореНрд╣реА рджреВрд░рд╕реНрдердкрдгреЗ GCP рдордзреНрдпреЗ рд▓реЙрдЧ рдЗрди рдХрд░рддрд╛рдирд╛ рдЯрд░реНрдорд┐рдирд▓рд╡рд░реВрди рд╕реНрдерд╛рдирд┐рдХрд░рд┐рддреНрдпрд╛ рдЪрд╛рд▓рд╡реВ рд╢рдХрддреЛ.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

рддрдерд╛рдкрд┐, рдЖрдореНрд╣реА рддреЗ DataFlow рд╡рд╛рдкрд░реВрди рдЪрд╛рд▓рд╡рдгрд╛рд░ рдЖрд╣реЛрдд. рдЦрд╛рд▓реАрд▓ рдЖрд╡рд╢реНрдпрдХ рдкреЕрд░рд╛рдореАрдЯрд░реНрд╕ рд╕реЗрдЯ рдХрд░реВрди рдЖрдкрдг рдЦрд╛рд▓реАрд▓ рдХрдорд╛рдВрдб рд╡рд╛рдкрд░реВрди рд╣реЗ рдХрд░реВ рд╢рдХрддреЛ.

  • project тАФ рддреБрдордЪреНрдпрд╛ GCP рдкреНрд░рдХрд▓реНрдкрд╛рдЪрд╛ рдЖрдпрдбреА.
  • runner рдПрдХ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд░рдирд░ рдЖрд╣реЗ рдЬреЛ рддреБрдордЪреНрдпрд╛ рдкреНрд░реЛрдЧреНрд░рд╛рдордЪреЗ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХрд░реЗрд▓ рдЖрдгрд┐ рддреБрдордЪреА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░реЗрд▓. рдХреНрд▓рд╛рдЙрдбрдордзреНрдпреЗ рдЪрд╛рд▓рдгреНрдпрд╛рд╕рд╛рдареА, рддреБрдореНрд╣реА DataflowRunner рдирд┐рд░реНрджрд┐рд╖реНрдЯ рдХрд░рдгреЗ рдЖрд╡рд╢реНрдпрдХ рдЖрд╣реЗ.
  • staging_location тАФ рдХрд╛рдо рдХрд░рдд рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдкреНрд░реЛрд╕реЗрд╕рд░рд▓рд╛ рдЖрд╡рд╢реНрдпрдХ рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдЗрдВрдбреЗрдХреНрд╕рд┐рдВрдЧ рдХреЛрдб рдкреЕрдХреЗрдЬреЗрд╕рд╕рд╛рдареА рдХреНрд▓рд╛рдЙрдб рдбреЗрдЯрд╛рдлреНрд▓реЛ рдХреНрд▓рд╛рдЙрдб рд╕реНрдЯреЛрд░реЗрдЬрдЪрд╛ рдорд╛рд░реНрдЧ.
  • temp_location тАФ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЪрд╛рд▓реВ рдЕрд╕рддрд╛рдирд╛ рддрдпрд╛рд░ рдХреЗрд▓реЗрд▓реНрдпрд╛ рддрд╛рддреНрдкреБрд░рддреНрдпрд╛ рдЬреЙрдм рдлрд╛рдЗрд▓реНрд╕ рд╕рдВрдЪрдпрд┐рдд рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдХреНрд▓рд╛рдЙрдб рдбреЗрдЯрд╛рдлреНрд▓реЛ рдХреНрд▓рд╛рдЙрдб рд╕реНрдЯреЛрд░реЗрдЬрдЪрд╛ рдорд╛рд░реНрдЧ.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

рд╣реА рдХрдорд╛рдВрдб рдЪрд╛рд▓реВ рдЕрд╕рддрд╛рдирд╛, рдЖрдореНрд╣реА google рдХрдиреНрд╕реЛрд▓рдордзреАрд▓ DataFlow рдЯреЕрдмрд╡рд░ рдЬрд╛рдКрди рдЖрдордЪреА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдкрд╛рд╣реВ рд╢рдХрддреЛ. рдЬреЗрд╡реНрд╣рд╛ рдЖрдкрдг рдкрд╛рдЗрдкрд▓рд╛рдЗрдирд╡рд░ рдХреНрд▓рд┐рдХ рдХрд░рддреЛ, рддреЗрд╡реНрд╣рд╛ рдЖрдкрд▓реНрдпрд╛рд▓рд╛ рдЖрдХреГрддреА 4 рд╕рд╛рд░рдЦреЗ рдХрд╛рд╣реАрддрд░реА рджрд┐рд╕рд▓реЗ рдкрд╛рд╣рд┐рдЬреЗ. рдбреАрдмрдЧрд┐рдВрдЧрдЪреНрдпрд╛ рдЙрджреНрджреЗрд╢рд╛рдиреЗ, рддрдкрд╢реАрд▓рд╡рд╛рд░ рд▓реЙрдЧ рдкрд╛рд╣рдгреНрдпрд╛рд╕рд╛рдареА рд▓реЙрдЧ рдЖрдгрд┐ рдирдВрддрд░ Stackdriver рд╡рд░ рдЬрд╛рдгреЗ рдЦреВрдк рдЙрдкрдпреБрдХреНрдд рдард░реВ рд╢рдХрддреЗ. рдпрд╛рдореБрд│реЗ рдорд▓рд╛ рдЕрдиреЗрдХ рдкреНрд░рдХрд░рдгрд╛рдВрдордзреНрдпреЗ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рд╕рдорд╕реНрдпрд╛рдВрдЪреЗ рдирд┐рд░рд╛рдХрд░рдг рдХрд░рдгреНрдпрд╛рдд рдорджрдд рдЭрд╛рд▓реА рдЖрд╣реЗ.

рдЖрдореНрд╣реА рд╕реНрдЯреНрд░реАрдо рдбреЗрдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рддреЛ. рднрд╛рдЧ 2
рдЖрдХреГрддреА 4: рдмреАрдо рдХрдиреНрд╡реНрд╣реЗрдпрд░

BigQuery рдордзреНрдпреЗ рдЖрдордЪреНрдпрд╛ рдбреЗрдЯрд╛рдордзреНрдпреЗ рдкреНрд░рд╡реЗрд╢ рдХрд░рд╛

рддрд░, рдЖрдордЪреНрдпрд╛ рдЯреЗрдмрд▓рдордзреНрдпреЗ рдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рд┐рдд рдХрд░рдгрд╛рд░реА рдкрд╛рдЗрдкрд▓рд╛рдЗрди рдЖрдзреАрдкрд╛рд╕реВрдирдЪ рдЪрд╛рд▓реВ рдЕрд╕рд╛рд╡реА. рдпрд╛рдЪреА рдЪрд╛рдЪрдгреА рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рдЖрдореНрд╣реА BigQuery рд╡рд░ рдЬрд╛рдКрди рдбреЗрдЯрд╛ рдкрд╛рд╣реВ рд╢рдХрддреЛ. рдЦрд╛рд▓реАрд▓ рдХрдорд╛рдВрдб рд╡рд╛рдкрд░рд▓реНрдпрд╛рдирдВрддрд░ рддреБрдореНрд╣рд╛рд▓рд╛ рдбреЗрдЯрд╛рд╕реЗрдЯрдЪреНрдпрд╛ рдкрд╣рд┐рд▓реНрдпрд╛ рдХрд╛рд╣реА рдкрдВрдХреНрддреА рджрд┐рд╕рд▓реНрдпрд╛ рдкрд╛рд╣рд┐рдЬреЗрдд. рдЖрддрд╛ рдЖрдордЪреНрдпрд╛рдХрдбреЗ BigQuery рдордзреНрдпреЗ рдбреЗрдЯрд╛ рд╕рдВрдЧреНрд░рд╣рд┐рдд рдЖрд╣реЗ, рдЖрдореНрд╣реА рдкреБрдвреАрд▓ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХрд░реВ рд╢рдХрддреЛ, рддрд╕реЗрдЪ рд╕рд╣рдХрд╛рд░реНтАНрдпрд╛рдВрд╕реЛрдмрдд рдбреЗрдЯрд╛ рд╢реЗрдЕрд░ рдХрд░реВ рд╢рдХрддреЛ рдЖрдгрд┐ рд╡реНрдпрд╛рд╡рд╕рд╛рдпрд┐рдХ рдкреНрд░рд╢реНрдирд╛рдВрдЪреА рдЙрддреНрддрд░реЗ рджреЗрдгреЗ рд╕реБрд░реВ рдХрд░реВ рд╢рдХрддреЛ.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

рдЖрдореНрд╣реА рд╕реНрдЯреНрд░реАрдо рдбреЗрдЯрд╛ рдкреНрд░реЛрд╕реЗрд╕рд┐рдВрдЧ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рддреЛ. рднрд╛рдЧ 2
рдЖрдХреГрддреА 5: BigQuery

рдирд┐рд╖реНрдХрд░реНрд╖

рдЖрдореНрд╣рд╛рд▓рд╛ рдЖрд╢рд╛ рдЖрд╣реЗ рдХреА рд╣реЗ рдкреЛрд╕реНрдЯ рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдбреЗрдЯрд╛ рдкрд╛рдЗрдкрд▓рд╛рдЗрди рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рддрд╕реЗрдЪ рдбреЗрдЯрд╛ рдЕрдзрд┐рдХ рдкреНрд░рд╡реЗрд╢рдпреЛрдЧреНрдп рдмрдирд╡рдгреНрдпрд╛рдЪреЗ рдорд╛рд░реНрдЧ рд╢реЛрдзрдгреНрдпрд╛рдЪреЗ рдПрдХ рдЙрдкрдпреБрдХреНрдд рдЙрджрд╛рд╣рд░рдг рдореНрд╣рдгреВрди рдХрд╛рдо рдХрд░реЗрд▓. рдпрд╛ рдлреЙрд░рдореЕрдЯрдордзреНрдпреЗ рдбреЗрдЯрд╛ рд╕рдВрдЧреНрд░рд╣рд┐рдд рдХреЗрд▓реНрдпрд╛рдиреЗ рдЖрдореНрд╣рд╛рд▓рд╛ рдЕрдиреЗрдХ рдлрд╛рдпрджреЗ рдорд┐рд│рддрд╛рдд. рдЖрддрд╛ рдЖрдореНрд╣реА рдорд╣рддреНрддреНрд╡рд╛рдЪреНрдпрд╛ рдкреНрд░рд╢реНрдирд╛рдВрдЪреА рдЙрддреНрддрд░реЗ рджреЗрдК рд╢рдХрддреЛ рдЬрд╕реЗ рдЖрдордЪреЗ рдЙрддреНрдкрд╛рджрди рдХрд┐рддреА рд▓реЛрдХ рд╡рд╛рдкрд░рддрд╛рдд? рддреБрдордЪрд╛ рд╡рд╛рдкрд░рдХрд░реНрддрд╛ рдЖрдзрд╛рд░ рдХрд╛рд▓рд╛рдВрддрд░рд╛рдиреЗ рд╡рд╛рдврдд рдЖрд╣реЗ рдХрд╛? рдЙрддреНрдкрд╛рджрдирд╛рдЪреНрдпрд╛ рдХреЛрдгрддреНрдпрд╛ рдкреИрд▓реВрдВрд╢реА рд▓реЛрдХ рд╕рд░реНрд╡рд╛рдзрд┐рдХ рд╕рдВрд╡рд╛рдж рд╕рд╛рдзрддрд╛рдд? рдЖрдгрд┐ рдЬрд┐рдереЗ рдирд╕рд╛рд╡реА рддрд┐рдереЗ рддреНрд░реБрдЯреА рдЖрд╣реЗрдд рдХрд╛? рд╣реЗ рдкреНрд░рд╢реНрди рд╕рдВрд╕реНрдереЗрдЪреНрдпрд╛ рд╣рд┐рддрд╛рдЪреЗ рдЕрд╕рддреАрд▓. рдпрд╛ рдкреНрд░рд╢реНрдирд╛рдВрдЪреНрдпрд╛ рдЙрддреНрддрд░рд╛рдВрдордзреВрди рдЙрджреНрднрд╡рд▓реЗрд▓реНрдпрд╛ рдЕрдВрддрд░реНрджреГрд╖реНрдЯреАрдЪреНрдпрд╛ рдЖрдзрд╛рд░рд╛рд╡рд░, рдЖрдореНрд╣реА рдЙрддреНрдкрд╛рджрди рд╕реБрдзрд╛рд░реВ рд╢рдХрддреЛ рдЖрдгрд┐ рд╡рд╛рдкрд░рдХрд░реНрддреНрдпрд╛рдЪреА рдкреНрд░рддрд┐рдмрджреНрдзрддрд╛ рд╡рд╛рдврд╡реВ рд╢рдХрддреЛ.

рдмреАрдо рдпрд╛ рдкреНрд░рдХрд╛рд░рдЪреНрдпрд╛ рд╡реНрдпрд╛рдпрд╛рдорд╛рд╕рд╛рдареА рдЦрд░реЛрдЦрд░ рдЙрдкрдпреБрдХреНрдд рдЖрд╣реЗ рдЖрдгрд┐ рдЗрддрд░ рдЕрдиреЗрдХ рдордиреЛрд░рдВрдЬрдХ рд╡рд╛рдкрд░ рдкреНрд░рдХрд░рдгреЗ рджреЗрдЦреАрд▓ рдЖрд╣реЗрдд. рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рддреБрдореНрд╣рд╛рд▓рд╛ рд░рд┐рдЕрд▓ рдЯрд╛рдЗрдордордзреНрдпреЗ рд╕реНрдЯреЙрдХ рдЯрд┐рдХ рдбреЗрдЯрд╛рдЪреЗ рд╡рд┐рд╢реНрд▓реЗрд╖рдг рдХрд░рд╛рдпрдЪреЗ рдЖрд╣реЗ рдЖрдгрд┐ рд╡рд┐рд╢реНрд▓реЗрд╖рдгрд╛рд╡рд░ рдЖрдзрд╛рд░рд┐рдд рд╡реНрдпрд╡рд╣рд╛рд░ рдХрд░рд╛рдпрдЪреЗ рдЖрд╣реЗрдд, рдХрджрд╛рдЪрд┐рдд рддреБрдордЪреНрдпрд╛рдХрдбреЗ рд╡рд╛рд╣рдирд╛рдВрдордзреВрди рдпреЗрдгрд╛рд░рд╛ рд╕реЗрдиреНрд╕рд░ рдбреЗрдЯрд╛ рдЕрд╕реЗрд▓ рдЖрдгрд┐ рддреБрдореНрд╣рд╛рд▓рд╛ рдЯреНрд░реЕрдлрд┐рдХ рдкрд╛рддрд│реАрдЪреА рдЧрдгрдирд╛ рдХрд░рд╛рдпрдЪреА рдЕрд╕реЗрд▓. рдЙрджрд╛рд╣рд░рдгрд╛рд░реНрде, рддреБрдореНрд╣реА рдПрдХ рдЧреЗрдорд┐рдВрдЧ рдХрдВрдкрдиреА рджреЗрдЦреАрд▓ рдЕрд╕реВ рд╢рдХрддрд╛ рдЬреА рд╡рд╛рдкрд░рдХрд░реНрддрд╛ рдбреЗрдЯрд╛ рд╕рдВрдХрд▓рд┐рдд рдХрд░рддреЗ рдЖрдгрд┐ рдореБрдЦреНрдп рдореЗрдЯреНрд░рд┐рдХреНрд╕рдЪрд╛ рдорд╛рдЧреЛрд╡рд╛ рдШреЗрдгреНрдпрд╛рд╕рд╛рдареА рдбреЕрд╢рдмреЛрд░реНрдб рддрдпрд╛рд░ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рд╡рд╛рдкрд░рддреЗ. рдареАрдХ рдЖрд╣реЗ, рд╕рдЬреНрдЬрдирд╛рдВрдиреЛ, рд╣рд╛ рджреБрд╕рд░реНтАНрдпрд╛ рдкреЛрд╕реНрдЯрд╕рд╛рдареА рд╡рд┐рд╖рдп рдЖрд╣реЗ, рд╡рд╛рдЪрд▓реНрдпрд╛рдмрджреНрджрд▓ рдзрдиреНрдпрд╡рд╛рдж, рдЖрдгрд┐ рдЬреНрдпрд╛рдВрдирд╛ рдкреВрд░реНрдг рдХреЛрдб рдкрд╣рд╛рдпрдЪрд╛ рдЖрд╣реЗ рддреНрдпрд╛рдВрдЪреНрдпрд╛рд╕рд╛рдареА рдЦрд╛рд▓реА рдорд╛рдЭреНрдпрд╛ GitHub рдЪреА рд▓рд┐рдВрдХ рдЖрд╣реЗ.

https://github.com/DFoly/User_log_pipeline

рдПрд╡рдвреЗрдЪ. рднрд╛рдЧ рдкрд╣рд┐рд▓рд╛ рд╡рд╛рдЪрд╛.

рд╕реНрддреНрд░реЛрдд: www.habr.com

рдПрдХ рдЯрд┐рдкреНрдкрдгреА рдЬреЛрдбрд╛