ืžื™ืจ ืžืึทื›ืŸ ืึท ื˜ื™ื™ึทืš ื“ืึทื˜ืŸ ืคึผืจืึทืกืขืกื™ื ื’ ืจืขืจื  - ืœื™ื ื™ืข. ื˜ื™ื™ืœ 2

ื ื’ื•ื˜ืŸ ื™ืขื“ืขืจ. ืžื™ืจ ื˜ื™ื™ืœืŸ ื“ื™ ืื™ื‘ืขืจื–ืขืฆื•ื ื’ ืคื•ืŸ ื“ื™ ืœืขืฆื˜ ื˜ื™ื™ืœ ืคื•ืŸ ื“ืขื ืึทืจื˜ื™ืงืœ, ืฆื•ื’ืขื’ืจื™ื™ื˜ ืกืคึผืึทืกื™ืคื™ืงืœื™ ืคึฟืึทืจ ืกื˜ื•ื“ืขื ื˜ืŸ ืคื•ืŸ ื“ืขื ืงื•ืจืก. "ื“ืึทื˜ืึท ื™ื ื–ืฉืขื ื™ืจ". ืื™ืจ ืงืขื ื˜ ืœื™ื™ืขื ืขืŸ ื“ืขื ืขืจืฉื˜ืขืจ ื˜ื™ื™ืœ ื“ืึธ.

ืึทืคึผืึทื˜ืฉื™ ืฉื˜ืจืึทืœ ืื•ืŸ ื“ืึทื˜ืึทืคืœืึธื•ื• ืคึฟืึทืจ ืคืึทืงื˜ื™ืฉ-ืฆื™ื™ื˜ ืคึผื™ื™ืคึผืœื™ื™ื ื–

ืžื™ืจ ืžืึทื›ืŸ ืึท ื˜ื™ื™ึทืš ื“ืึทื˜ืŸ ืคึผืจืึทืกืขืกื™ื ื’ ืจืขืจื  - ืœื™ื ื™ืข. ื˜ื™ื™ืœ 2

ื‘ืึทืฉื˜ืขื˜ื™ืงืŸ Google ืงืœืึธื•ื“

ื‘ืึทืžืขืจืงื•ื ื’: ืื™ืš ื’ืขื•ื•ื™ื™ื ื˜ Google Cloud Shell ืฆื• ืœื•ื™ืคืŸ ื“ื™ ืจืขืจื  - ืœื™ื ื™ืข ืื•ืŸ ืึทืจื•ื™ืกื’ืขื‘ืŸ ืžื ื”ื’ ืœืึธื’ ื“ืึทื˜ืŸ ื•ื•ื™ื™ึทืœ ืื™ืš ืื™ื– ื’ืขื•ื•ืขืŸ ืงืึธื ืคืœื™ืงื˜ ืžื™ื˜ ื“ื™ ืจืขืจื  - ืœื™ื ื™ืข ืื™ืŸ Python 3. Google Cloud Shell ื ื™ืฆื˜ ืคึผื™ื˜ื”ืึธืŸ 2, ื•ื•ืึธืก ืื™ื– ืžืขืจ ืงืึธื ืกื™ืกื˜ืขื ื˜ ืžื™ื˜ ืึทืคึผืึทื˜ืฉื™ ืฉื˜ืจืึทืœ.

ืฆื• ืึธื ื”ื™ื™ื‘ืŸ ื“ื™ ืจืขืจื  - ืœื™ื ื™ืข, ืžื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ื’ืจืึธื‘ืŸ ืึท ื‘ื™ืกืœ ืื™ืŸ ื“ื™ ืกืขื˜ื˜ื™ื ื’ืก. ืคึฟืึทืจ ื“ื™ ืคื•ืŸ ืื™ืจ ื•ื•ืึธืก ื”ืึธื‘ืŸ ื ื™ืฉื˜ ื’ืขื•ื•ื™ื™ื ื˜ GCP ืคืจื™ืขืจ, ืื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ื ืึธื›ืคืึธืœื’ืŸ ื“ื™ ืคืืœื’ืขื ื“ืข 6 ืกื˜ืขืคึผืก ืึทื•ื˜ืœื™ื™ื ื“ ืื™ืŸ ื“ืขื ื–ื™ื™ึทื˜.

ื ืึธืš ื“ืขื, ืžื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ืฆื•ืคึฟืขืœื™ืงืขืจ ืื•ื ื“ื–ืขืจ ืกืงืจื™ืคึผืก ืฆื• Google ืงืœืึธื•ื“ ืกื˜ืึธืจื™ื“ื–ืฉ ืื•ืŸ ื ืึธื›ืžืึทื›ืŸ ื–ื™ื™ ืฆื• ืื•ื ื“ื–ืขืจ Google ืงืœืึธื•ื“ ืฉืขืœ. ื•ืคึผืœืึธืึทื“ื™ื ื’ ืฆื• ื•ื•ืึธืœืงืŸ ืกื˜ืึธืจื™ื“ื–ืฉ ืื™ื– ื’ืึทื ืฅ ื˜ืจื™ื•ื•ื™ืึทืœ (ืึท ื‘ืึทืฉืจื™ื™ึทื‘ื•ื ื’ ืงืขื ืขืŸ ื–ื™ื™ืŸ ื’ืขืคึฟื•ื ืขืŸ ื“ืึธ). ืฆื• ื ืึธื›ืžืึทื›ืŸ ืื•ื ื“ื–ืขืจ ื˜ืขืงืขืก, ืžื™ืจ ืงืขื ืขืŸ ืขืคึฟืขื ืขืŸ Google Cloud Shel ืคึฟื•ืŸ ื“ื™ ืžื›ืฉื™ืจ ื“ื•ืจืš ื’ืขื‘ืŸ ืึท ืงืœื™ืง ืื•ื™ืฃ ื“ืขืจ ืขืจืฉื˜ืขืจ ื‘ื™ืœื“ืœ ืื•ื™ืฃ ื“ื™ ืœื™ื ืงืก ืื™ืŸ ืคื™ื’ื•ืจืข 2 ืื•ื ื˜ืŸ.

ืžื™ืจ ืžืึทื›ืŸ ืึท ื˜ื™ื™ึทืš ื“ืึทื˜ืŸ ืคึผืจืึทืกืขืกื™ื ื’ ืจืขืจื  - ืœื™ื ื™ืข. ื˜ื™ื™ืœ 2
ืคื™ื’ื•ืจืข ืงืกื ื•ืžืงืก

ื“ื™ ืงืึทืžืึทื ื“ื– ื•ื•ืึธืก ืžื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ื ืึธื›ืžืึทื›ืŸ ื“ื™ ื˜ืขืงืขืก ืื•ืŸ ื™ื ืกื˜ืึทืœื™ืจืŸ ื“ื™ ืคืืจืœืื ื’ื˜ ืœื™ื™ื‘ืจืขืจื™ื– ื–ืขื ืขืŸ ืœื™ืกื˜ืขื“ ืื•ื ื˜ืŸ.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

ืงืจื™ื™ื™ื˜ื™ื ื’ ืื•ื ื“ื–ืขืจ ื“ืึทื˜ืึทื‘ื™ื™ืก ืื•ืŸ ื˜ื™ืฉ

ืึทืžืึธืœ ืžื™ืจ ื”ืึธื‘ืŸ ื“ื•ืจื›ื’ืขืงืึธื›ื˜ ืึทืœืข ื“ื™ ืกืขื˜ืึทืคึผ ืฉื™ื™ึทื›ื•ืช ืกื˜ืขืคึผืก, ื“ืขืจ ื•ื•ื™ื™ึทื˜ืขืจ ื–ืึทืš ืžื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ื˜ืึธืŸ ืื™ื– ืฉืึทืคึฟืŸ ืึท ื“ืึทื˜ืึทืกืขื˜ ืื•ืŸ ื˜ื™ืฉ ืื™ืŸ BigQuery. ืขืก ื–ืขื ืขืŸ ืขื˜ืœืขื›ืข ื•ื•ืขื’ืŸ ืฆื• ื˜ืึธืŸ ื“ืึธืก, ืึธื‘ืขืจ ื“ื™ ืกื™ืžืคึผืœืึทืกื˜ ืื™ื– ืฆื• ื ื•ืฆืŸ ื“ื™ Google ืงืœืึธื•ื“ ืงืึทื ืกืึธื•ืœ ื“ื•ืจืš ืขืจืฉื˜ืขืจ ืงืจื™ื™ื™ื˜ื™ื ื’ ืึท ื“ืึทื˜ืึทืกืขื˜. ืื™ืจ ืงืขื ืขืŸ ื ืึธื›ื’ื™ื™ืŸ ื“ื™ ืกื˜ืขืคึผืก ืื•ื ื˜ืŸ ืจื•ื ื’ืฆื• ืฉืึทืคึฟืŸ ืึท ื˜ื™ืฉ ืžื™ื˜ ืึท ืกื˜ืฉืขืžืึท. ืื•ื ื“ื–ืขืจ ื˜ื™ืฉ ื•ื•ืขื˜ ื”ืึธื‘ืŸ 7 ืฉืคืืœื˜ืŸ, ืงืึธืจืึทืกืคึผืึทื ื“ื™ื ื’ ืฆื• ื“ื™ ืงืึทืžืคึผืึธื•ื ืึทื ืฅ ืคื•ืŸ ื™ืขื“ืขืจ ื‘ืึทื ื™ืฆืขืจ ืงืœืึธืฅ. ืคึฟืึทืจ ืงืึทื ื•ื•ื™ื ื™ืึทื ืก, ืžื™ืจ ื•ื•ืขืœืŸ ื“ืขืคื™ื ื™ืจืŸ ืึทืœืข ืฉืคืืœื˜ืŸ ื•ื•ื™ ืกื˜ืจื™ื ื’ืก, ืึทื—ื•ืฅ ืคึฟืึทืจ ื“ื™ ื˜ื™ืžืขืœืึธืงืึทืœ ื‘ื™ื™ึทื˜ืขื•ื•ื“ื™ืง, ืื•ืŸ ื ืขืžืขืŸ ื–ื™ื™ ืœื•ื™ื˜ ื“ื™ ื•ื•ืขืจื™ืึทื‘ืึทืœื– ื•ื•ืึธืก ืžื™ืจ ื“ื–ืฉืขื ืขืจื™ื™ื˜ืึทื“ ืคืจื™ืขืจ. ื“ืขืจ ืื•ื™ืกืœื™ื™ื’ ืคื•ืŸ ืื•ื ื“ื–ืขืจ ื˜ื™ืฉ ื–ืึธืœ ืงื•ืงืŸ ื•ื•ื™ ืื™ืŸ ืคื™ื’ื•ืจืข 3.

ืžื™ืจ ืžืึทื›ืŸ ืึท ื˜ื™ื™ึทืš ื“ืึทื˜ืŸ ืคึผืจืึทืกืขืกื™ื ื’ ืจืขืจื  - ืœื™ื ื™ืข. ื˜ื™ื™ืœ 2
ืคื™ื’ื•ืจืข 3. ื˜ื™ืฉ ืื•ื™ืกืœื™ื™ื’

ืืจื•ื™ืกื’ืขื‘ืŸ ื‘ืึทื ื™ืฆืขืจ ืงืœืึธืฅ ื“ืึทื˜ืŸ

ืคึผื•ื‘ / ืกื•ื‘ ืื™ื– ืึท ืงืจื™ื˜ื™ืฉ ืงืึธืžืคึผืึธื ืขื ื˜ ืคื•ืŸ ืื•ื ื“ื–ืขืจ ืจืขืจื  - ืœื™ื ื™ืข ื•ื•ื™ื™ึทืœ ืขืก ืึทืœืึทื•ื– ืงื™ื™ืคืœ ืคืจื™ื™ึท ืึทืคึผืœืึทืงื™ื™ืฉืึทื ื– ืฆื• ื™ื‘ืขืจื’ืขื‘ืŸ ืžื™ื˜ ื™ืขื“ืขืจ ืื ื“ืขืจืขืจ. ืื™ืŸ ื‘ืึทื–ื•ื ื“ืขืจ, ืขืก ืึทืจื‘ืขื˜ ื•ื•ื™ ืึท ื™ื ื˜ืขืจืžื™ื“ื™ืขืจื™ ื•ื•ืึธืก ืึทืœืึทื•ื– ืื•ื ื“ื– ืฆื• ืฉื™ืงืŸ ืื•ืŸ ื‘ืึทืงื•ืžืขืŸ ืึทืจื˜ื™ืงืœืขืŸ ืฆื•ื•ื™ืฉืŸ ืึทืคึผืœืึทืงื™ื™ืฉืึทื ื–. ื“ืขืจ ืขืจืฉื˜ืขืจ ื–ืึทืš ืžื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ื˜ืึธืŸ ืื™ื– ืฉืึทืคึฟืŸ ืึท ื˜ืขืžืข. ืคืฉื•ื˜ ื’ื™ื™ืŸ ืฆื• Pub / Sub ืื™ืŸ ื“ื™ ืงืึทื ืกืึธื•ืœ ืื•ืŸ ื’ื™ื˜ CREATE TOPIC.

ื“ืขืจ ืงืึธื“ ืื•ื ื˜ืŸ ืจื•ืคื˜ ืื•ื ื“ื–ืขืจ ืฉืจื™ืคื˜ ืฆื• ื“ื–ืฉืขื ืขืจื™ื™ื˜ ื“ื™ ืงืœืึธืฅ ื“ืึทื˜ืŸ ื“ื™ืคื™ื™ื ื“ ืื•ื™ื‘ืŸ ืื•ืŸ ื“ืขืžืึธืœื˜ ืงืึทื ืขืงืฅ ืื•ืŸ ืกืขื ื“ื– ื“ื™ ืœืึธื’ืก ืฆื• ืคึผื•ื‘ / ืกื•ื‘. ื“ืขืจ ื‘ืœื•ื™ื– ื–ืึทืš ืžื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ื˜ืึธืŸ ืื™ื– ืฉืึทืคึฟืŸ ืึท ื›ื™ื™ืคืขืฅ PublisherClient, ืกืคึผืขืฆื™ืคื™ืฆื™ืจืŸ ื“ืขื ื“ืจืš ืฆื• ื“ืขืจ ื˜ืขืžืข ื ื™ืฆืŸ ื“ืขื ืื•ืคึฟืŸ topic_path ืื•ืŸ ืจื•ืคืŸ ื“ื™ ืคึฟื•ื ืงืฆื™ืข publish ั topic_path ืื•ืŸ ื“ืึทื˜ืŸ. ื‘ื™ื˜ืข ื˜ืึธืŸ ืึทื– ืžื™ืจ ืึทืจื™ื™ึทื ืคื™ืจ generate_log_line ืคื•ืŸ ืื•ื ื“ื–ืขืจ ืฉืจื™ืคื˜ stream_logs, ืึทื–ื•ื™ ืžืึทื›ืŸ ื–ื™ื›ืขืจ ืึทื– ื“ื™ ื˜ืขืงืขืก ื–ืขื ืขืŸ ืื™ืŸ ื“ืขืจ ื–ืขืœื‘ื™ืงืขืจ ื˜ืขืงืข, ืึทื ื“ืขืจืฉ ืื™ืจ ื•ื•ืขื˜ ื‘ืึทืงื•ืžืขืŸ ืึท ืึทืจื™ื™ึทื ืคื™ืจ ื˜ืขื•ืช. ืžื™ืจ ืงืขื ืขืŸ ื“ืขืจื™ื‘ืขืจ ืœื•ื™ืคืŸ ื“ืขื ื“ื•ืจืš ืื•ื ื“ื–ืขืจ Google ืงืึทื ืกืึธื•ืœ ื ื™ืฆืŸ:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

ื•ื•ื™ ื‘ืึทืœื“ ื•ื•ื™ ื“ื™ ื˜ืขืงืข ืœื•ื™ืคื˜, ืžื™ืจ ืงืขื ืขืŸ ื–ืขืŸ ื“ื™ ืจืขื–ื•ืœื˜ืึทื˜ ืคื•ืŸ ื“ื™ ืงืœืึธืฅ ื“ืึทื˜ืŸ ืฆื• ื“ื™ ืงืึทื ืกืึธื•ืœ, ื•ื•ื™ ื’ืขื•ื•ื™ื–ืŸ ืื™ืŸ ื“ื™ ืคื™ื’ื•ืจ ืื•ื ื˜ืŸ. ื“ืขืจ ืฉืจื™ืคื˜ ื•ื•ืขื˜ ืึทืจื‘ืขื˜ืŸ ืึทื–ื•ื™ ืœืึทื ื’ ื•ื•ื™ ืžื™ืจ ื˜ืึธืŸ ื ื™ื˜ ื ื•ืฆืŸ ืงื˜ืจืœ + Cืฆื• ืคืึทืจืขื ื“ื™ืงืŸ ืขืก.

ืžื™ืจ ืžืึทื›ืŸ ืึท ื˜ื™ื™ึทืš ื“ืึทื˜ืŸ ืคึผืจืึทืกืขืกื™ื ื’ ืจืขืจื  - ืœื™ื ื™ืข. ื˜ื™ื™ืœ 2
ืคื™ื’ื•ืจืข 4. ืจืขื–ื•ืœื˜ืึทื˜ publish_logs.py

ืฉืจื™ื™ื‘ืŸ ืื•ื ื“ื–ืขืจ ืจืขืจื  - ืœื™ื ื™ืข ืงืึธื“

ืื™ืฆื˜ ืึทื– ืžื™ืจ ื”ืึธื‘ืŸ ืึทืœืฅ ืฆื•ื’ืขื’ืจื™ื™ื˜, ืžื™ืจ ืงืขื ืขืŸ ืึธื ื”ื™ื™ื‘ืŸ ื“ืขื ืฉืคึผืึทืก ื˜ื™ื™ืœ - ืงืึธื“ื™ืจื•ื ื’ ืคื•ืŸ ืื•ื ื“ื–ืขืจ ืจืขืจื  - ืœื™ื ื™ืข ืžื™ื˜ ืฉื˜ืจืึทืœ ืื•ืŸ ืคึผื™ื˜ื”ืึธืŸ. ืฆื• ืฉืึทืคึฟืŸ ืึท ืฉื˜ืจืึทืœ ืจืขืจื  - ืœื™ื ื™ืข, ืžื™ืจ ื“ืึทืจืคึฟืŸ ืฆื• ืฉืึทืคึฟืŸ ืึท ืจืขืจื  - ืœื™ื ื™ืข ื›ื™ื™ืคืขืฅ (ืคึผ). ืึทืžืึธืœ ืžื™ืจ ื”ืึธื‘ืŸ ื‘ืืฉืืคืŸ ืึท ืจืขืจื  - ืœื™ื ื™ืข ื›ื™ื™ืคืขืฅ, ืžื™ืจ ืงืขื ืขืŸ ืฆื•ืœื™ื™ื’ืŸ ืงื™ื™ืคืœ ืคืึทื ื’ืงืฉืึทื ื– ืื™ื™ื ืขืจ ื ืึธืš ื“ืขื ืื ื“ืขืจืŸ ื ื™ืฆืŸ ื“ื™ ืึธืคึผืขืจืึทื˜ืึธืจ pipe (|). ืื™ืŸ ืึทืœื’ืขืžื™ื™ืŸ, ื“ื™ ื•ื•ืึธืจืงืคืœืึธื•ื• ืงื•ืงื˜ ื•ื•ื™ ื“ื™ ื‘ื™ืœื“ ืื•ื ื˜ืŸ.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

ืื™ืŸ ืื•ื ื“ื–ืขืจ ืงืึธื“, ืžื™ืจ ื•ื•ืขืœืŸ ืžืึทื›ืŸ ืฆื•ื•ื™ื™ ืžื ื”ื’ ืคืึทื ื’ืงืฉืึทื ื–. ืคึฟื•ื ืงืฆื™ืข regex_clean, ื•ื•ืึธืก ืกืงืึทื ื– ื“ื™ ื“ืึทื˜ืŸ ืื•ืŸ ืจื™ื˜ืจื™ื•ื•ื– ื“ื™ ืงืึธืจืึทืกืคึผืึทื ื“ื™ื ื’ ืจื•ื“ืขืจืŸ ื‘ืื–ื™ืจื˜ ืื•ื™ืฃ ื“ื™ PATTERNS ืจืฉื™ืžื” ื ื™ืฆืŸ ื“ื™ ืคึฟื•ื ืงืฆื™ืข re.search. ื“ื™ ืคึฟื•ื ืงืฆื™ืข ืงืขืจื˜ ืึท ืงืึธืžืข ืืคื’ืขืฉื™ื™ื“ื˜ ืฉื˜ืจื™ืงืœ. ืื•ื™ื‘ ืื™ืจ ื–ืขื ื˜ ื ื™ืฉื˜ ืึท ืจืขื’ื•ืœืขืจ ืื•ื™ืกื“ืจื•ืง ืžื•ืžื—ื”, ืื™ืš ืจืขืงืึธืžืขื ื“ื™ืจืŸ ืฆื• ืงืึธื ื˜ืจืึธืœื™ืจืŸ ื“ืขื ื˜ื•ื˜ืึธืจื™ืึทืœ ืื•ืŸ ืคื™ืจ ืื™ืŸ ืึท ื ืึธื˜ืขืคึผืึทื“ ืฆื• ืงืึธื ื˜ืจืึธืœื™ืจืŸ ื“ืขื ืงืึธื“. ื ืึธืš ื“ืขื, ืžื™ืจ ื“ืขืคื™ื ื™ืจืŸ ืึท ืžื ื”ื’ ืคึผืึทืจื“ืึธ ืคึฟื•ื ืงืฆื™ืข ื’ืขืจื•ืคืŸ ืฉืคึผืึทืœื˜ืŸ, ื•ื•ืึธืก ืื™ื– ืึท ื•ื•ืขืจื™ื™ื™ืฉืึทืŸ ืคื•ืŸ ื“ื™ ืฉื˜ืจืึทืœ ื™ื‘ืขืจืžืึทื›ืŸ ืคึฟืึทืจ ืคึผืึทืจืึทืœืขืœ ืคึผืจืึทืกืขืกื™ื ื’. ืื™ืŸ ืคึผื™ื˜ื”ืึธืŸ, ื“ืึธืก ืื™ื– ื’ืขื˜ืืŸ ืื™ืŸ ืึท ืกืคึผืขืฆื™ืขืœ ื•ื•ืขื’ - ืžื™ืจ ืžื•ื–ืŸ ืžืึทื›ืŸ ืึท ืงืœืึทืก ื•ื•ืึธืก ื™ื ื›ืขืจืึทืฅ ืคื•ืŸ ื“ื™ DoFn Beam ืงืœืึทืก. ื“ื™ ืกืคึผืœื™ื˜ ืคึฟื•ื ืงืฆื™ืข ื ืขืžื˜ ื“ื™ ืคึผืึทืจืกื˜ ืจื•ื“ืขืจืŸ ืคื•ืŸ ื“ื™ ืคืจื™ืขืจื“ื™ืงืข ืคึฟื•ื ืงืฆื™ืข ืื•ืŸ ืงืขืจื˜ ืึท ืจืฉื™ืžื” ืคื•ืŸ ื“ื™ืงืฉืึทื ืขืจื™ื– ืžื™ื˜ ืฉืœื™ืกืœืขืŸ ืงืึธืจืึทืกืคึผืึทื ื“ื™ื ื’ ืฆื• ื“ื™ ื–ื™ื™ึทืœ ื ืขืžืขืŸ ืื™ืŸ ืื•ื ื“ื–ืขืจ BigQuery ื˜ื™ืฉ. ืขืก ืื™ื– ืขืคึผืขืก ืฆื• ื˜ืึธืŸ ื•ื•ืขื’ืŸ ื“ืขื ืคึฟื•ื ืงืฆื™ืข: ืื™ืš ื”ืื˜ ืฆื• ืึทืจื™ื™ึทื ืคื™ืจ datetime ืื™ืŸ ืึท ืคึฟื•ื ืงืฆื™ืข ืฆื• ืžืึทื›ืŸ ืขืก ืึทืจื‘ืขื˜. ืื™ืš ืื™ื– ื’ืขื•ื•ืขืŸ ื’ืขื˜ื™ื ื’ ืึท ืึทืจื™ื™ึทื ืคื™ืจ ื˜ืขื•ืช ืื™ืŸ ื“ื™ ืึธื ื”ื™ื™ื‘ ืคื•ืŸ ื“ืขืจ ื˜ืขืงืข, ื•ื•ืึธืก ืื™ื– ื’ืขื•ื•ืขืŸ ื˜ืฉื•ื“ื ืข. ื“ืขืจ ืจืฉื™ืžื” ืื™ื– ื“ืขืจื ืึธืš ื“ื•ืจื›ื’ืขื’ืื ื’ืขืŸ ืฆื• ื“ื™ ืคึฟื•ื ืงืฆื™ืข WriteToBigQuery, ื•ื•ืึธืก ืคืฉื•ื˜ ืžื•ืกื™ืฃ ืื•ื ื“ื–ืขืจ ื“ืึทื˜ืŸ ืฆื• ื“ื™ ื˜ื™ืฉ. ื“ืขืจ ืงืึธื“ ืคึฟืึทืจ ื‘ืึทื˜ืฉ ื“ืึทื˜ืึทืคืœืึธื•ื• ื“ื–ืฉืึธื‘ ืื•ืŸ ืกื˜ืจื™ืžื™ื ื’ ื“ืึทื˜ืึทืคืœืึธื•ื• ื“ื–ืฉืึธื‘ ืื™ื– ื’ืขื’ืขื‘ืŸ ืื•ื ื˜ืŸ. ื“ืขืจ ื‘ืœื•ื™ื– ื—ื™ืœื•ืง ืฆื•ื•ื™ืฉืŸ ืคึผืขืงืœ ืื•ืŸ ืกื˜ืจื™ืžื™ื ื’ ืงืึธื“ ืื™ื– ืึทื– ืื™ืŸ ืคึผืขืงืœ ืžื™ืจ ืœื™ื™ืขื ืขืŸ ื“ื™ CSV ืคึฟื•ืŸ src_pathื ื™ืฆืŸ ื“ื™ ืคึฟื•ื ืงืฆื™ืข ReadFromText ืคื•ืŸ ื‘ืึทื.

ื‘ืึทื˜ืฉ ื“ืึทื˜ืึทืคืœืึธื•ื• ืึทืจื‘ืขื˜ (ื‘ืึทื˜ืฉ ืคึผืจืึทืกืขืกื™ื ื’)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

ืกื˜ืจื™ืžื™ื ื’ DataFlow ืึทืจื‘ืขื˜ (ืกื˜ืจื™ื ืคึผืจืึทืกืขืกื™ื ื’)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

ืกื˜ืึทืจื˜ื™ื ื’ ื“ื™ ืงืึทื ื•ื•ื™ื™ืขืจ

ืžื™ืจ ืงืขื ืขืŸ ืœื•ื™ืคืŸ ื“ื™ ืจืขืจื  - ืœื™ื ื™ืข ืื™ืŸ ืขื˜ืœืขื›ืข ืคืึทืจืฉื™ื“ืขื ืข ื•ื•ืขื’ืŸ. ืื•ื™ื‘ ืžื™ืจ ื•ื•ืขืœืŸ, ืžื™ืจ ืงืขืŸ ื ืึธืจ ืœื•ื™ืคืŸ ืขืก ืœืึธื•ืงืึทืœื™ ืคึฟื•ืŸ ืึท ื•ื•ืึธืงื–ืึทืœ ื‘ืฉืขืช ืœืึธื’ื™ื ื’ ืื™ืŸ GCP ืจื™ืžืึธื•ื˜ืœื™.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

ืึธื‘ืขืจ, ืžื™ืจ ื•ื•ืขืœืŸ ืœื•ื™ืคืŸ ืขืก ืžื™ื˜ DataFlow. ืžื™ืจ ืงืขื ืขืŸ ื˜ืึธืŸ ื“ืึธืก ื ื™ืฆืŸ ื“ื™ ืื•ื ื˜ืŸ ื‘ืึทืคึฟืขืœ ื“ื•ืจืš ื‘ืึทืฉื˜ืขื˜ื™ืงืŸ ื“ื™ ืคืืœื’ืขื ื“ืข ืคืืจืœืื ื’ื˜ ืคึผืึทืจืึทืžืขื˜ืขืจืก.

  • project - ID ืคื•ืŸ ื“ื™ื™ืŸ GCP ืคึผืจื•ื™ืขืงื˜.
  • runner ืื™ื– ืึท ืจืขืจื  - ืœื™ื ื™ืข ื•ื•ืึธืก ื•ื•ืขื˜ ืึทื ืึทืœื™ื™ื– ื“ื™ื™ืŸ ืคึผืจืึธื’ืจืึทื ืื•ืŸ ื‘ื•ื™ืขืŸ ื“ื™ื™ืŸ ืจืขืจื  - ืœื™ื ื™ืข. ืฆื• ืœื•ื™ืคืŸ ืื™ืŸ ื“ื™ ื•ื•ืึธืœืงืŸ, ืื™ืจ ืžื•ื–ืŸ ืกืคึผืขืฆื™ืคื™ืฆื™ืจืŸ ืึท DataflowRunner.
  • staging_location - ื“ืขืจ ื“ืจืš ืฆื• ื“ื™ ืงืœืึธื•ื“ ื“ืึทื˜ืึทืคืœืึธื•ื• ื•ื•ืึธืœืงืŸ ืกื˜ืึธืจื™ื“ื–ืฉ ืคึฟืึทืจ ื™ื ื“ืขืงืกื™ื ื’ ืงืึธื“ ืคึผืึทืงืึทื“ื–ืฉืึทื– ื“ืืจืฃ ื“ื•ืจืš ื“ื™ ืคึผืจืึทืกืขืกืขืจื– ื•ื•ืึธืก ื“ื•ืจื›ืคื™ืจืŸ ื“ื™ ืึทืจื‘ืขื˜.
  • temp_location - ื“ืจืš ืฆื• ื“ื™ ืงืœืึธื•ื“ ื“ืึทื˜ืึทืคืœืึธื•ื• ื•ื•ืึธืœืงืŸ ืกื˜ืึธืจื™ื“ื–ืฉ ืคึฟืึทืจ ืกื˜ืึธืจื™ื ื’ ืฆื™ื™ึทื˜ื•ื•ื™ื™ึทืœื™ืง ืึทืจื‘ืขื˜ ื˜ืขืงืขืก ื‘ืืฉืืคืŸ ื‘ืฉืขืช ื“ื™ ืจืขืจื  - ืœื™ื ื™ืข ืื™ื– ืคืœื™ืกื ื“ื™ืง.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

ื‘ืฉืขืช ื“ืขื ื‘ืึทืคึฟืขืœ ืื™ื– ืคืœื™ืกื ื“ื™ืง, ืžื™ืจ ืงืขื ืขืŸ ื’ื™ื™ืŸ ืฆื• ื“ื™ DataFlow ืงื•ื•ื™ื˜ืœ ืื™ืŸ ื“ื™ Google ืงืึทื ืกืึธื•ืœ ืื•ืŸ ื–ืขืŸ ืื•ื ื“ื–ืขืจ ืจืขืจื  - ืœื™ื ื™ืข. ื•ื•ืขืŸ ืžื™ืจ ื“ืจื™ืงื˜ ืื•ื™ืฃ ื“ื™ ืจืขืจื  - ืœื™ื ื™ืข, ืžื™ืจ ื–ืึธืœ ื–ืขืŸ ืขืคึผืขืก ืขื ืœืขืš ืฆื• ืคื™ื’ื•ืจืข 4. ืคึฟืึทืจ ื“ื™ื‘ืึทื’ื™ื ื’ ืฆื•ื•ืขืงืŸ, ืขืก ืงืขืŸ ื–ื™ื™ืŸ ื–ื™ื™ืขืจ ื ื•ืฆื™ืง ืฆื• ื’ื™ื™ืŸ ืฆื• ืœืึธื’ืก ืื•ืŸ ื“ืึทืŸ ืกื˜ืึทืงื“ืจื™ื•ื•ืขืจ ืฆื• ื–ืขืŸ ื“ื™ื˜ื™ื™ืœื“ ืœืึธื’ืก. ื“ืึธืก ื”ืึธื˜ ื’ืขื”ืึธืœืคึฟืŸ ืžื™ืจ ืกืึธืœื•ื•ืข ืจืขืจื  - ืœื™ื ื™ืข ื™ืฉื•ื– ืื™ืŸ ืึท ื ื•ืžืขืจ ืคื•ืŸ ืงืึทืกืขืก.

ืžื™ืจ ืžืึทื›ืŸ ืึท ื˜ื™ื™ึทืš ื“ืึทื˜ืŸ ืคึผืจืึทืกืขืกื™ื ื’ ืจืขืจื  - ืœื™ื ื™ืข. ื˜ื™ื™ืœ 2
ืคื™ื’ื•ืจืข 4: ืฉื˜ืจืึทืœ ืงืึทื ื•ื•ื™ื™ืขืจ

ืึทืงืกืขืก ืื•ื ื“ื–ืขืจ ื“ืึทื˜ืŸ ืื™ืŸ BigQuery

ืึทื–ื•ื™, ืžื™ืจ ื–ืึธืœ ืฉื•ื™ืŸ ื”ืึธื‘ืŸ ืึท ืจืขืจื  - ืœื™ื ื™ืข ืคืœื™ืกื ื“ื™ืง ืžื™ื˜ ื“ืึทื˜ืŸ ืคืœืึธื•ื™ื ื’ ืื™ืŸ ืื•ื ื“ื–ืขืจ ื˜ื™ืฉ. ืฆื• ืคึผืจื•ื‘ื™ืจืŸ ื“ืึธืก, ืžื™ืจ ืงืขื ืขืŸ ื’ื™ื™ืŸ ืฆื• BigQuery ืื•ืŸ ืงื•ืง ืื™ืŸ ื“ื™ ื“ืึทื˜ืŸ. ื ืึธืš ื ื™ืฆืŸ ื“ื™ ื‘ืึทืคึฟืขืœ ืื•ื ื˜ืŸ ืื™ืจ ื–ืึธืœ ื–ืขืŸ ื“ื™ ืขืจืฉื˜ืขืจ ื‘ื™ืกืœ ืจืึธื•ื– ืคื•ืŸ ื“ื™ ื“ืึทื˜ืึทืกืขื˜. ืื™ืฆื˜ ืึทื– ืžื™ืจ ื”ืึธื‘ืŸ ื“ื™ ื“ืึทื˜ืŸ ืกื˜ืึธืจื“ ืื™ืŸ BigQuery, ืžื™ืจ ืงืขื ืขืŸ ืึธื ืคื™ืจืŸ ื•ื•ื™ื™ึทื˜ืขืจ ืึทื ืึทืœื™ืกื™ืก, ื•ื•ื™ ืื•ื™ืš ื˜ื™ื™ืœืŸ ื“ื™ ื“ืึทื˜ืŸ ืžื™ื˜ ื—ื‘ืจื™ื ืื•ืŸ ืึธื ื”ื™ื™ื‘ืŸ ืขื ื˜ืคึฟืขืจืŸ ื’ืขืฉืขืคื˜ ืคึฟืจืื’ืŸ.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

ืžื™ืจ ืžืึทื›ืŸ ืึท ื˜ื™ื™ึทืš ื“ืึทื˜ืŸ ืคึผืจืึทืกืขืกื™ื ื’ ืจืขืจื  - ืœื™ื ื™ืข. ื˜ื™ื™ืœ 2
ืคื™ื’ื•ืจืข 5: ื‘ื™ื’ืงื•ื•ืขืจื™

ืกืึธืฃ

ืžื™ืจ ื”ืึธืคืŸ ืึทื– ื“ืขื ืคึผืึธืกื˜ืŸ ืกืขืจื•ื•ืขืก ื•ื•ื™ ืึท ื ื•ืฆื™ืง ื‘ื™ื™ืฉืคึผื™ืœ ืคื•ืŸ ืงืจื™ื™ื™ื˜ื™ื ื’ ืึท ืกื˜ืจื™ืžื™ื ื’ ื“ืึทื˜ืŸ ืจืขืจื  - ืœื™ื ื™ืข, ื•ื•ื™ ื’ืขื–ื•ื ื˜ ื•ื•ื™ ืฆื• ื’ืขืคึฟื™ื ืขืŸ ื•ื•ืขื’ืŸ ืฆื• ืžืึทื›ืŸ ื“ืึทื˜ืŸ ืžืขืจ ืฆื•ื˜ืจื™ื˜ืœืขืš. ืกื˜ืึธืจื™ื ื’ ื“ืึทื˜ืŸ ืื™ืŸ ื“ืขื ืคึฟืึธืจืžืึทื˜ ื’ื™ื˜ ืื•ื ื“ื– ืคื™ืœืข ืึทื“ื•ื•ืึทื ื˜ื™ื“ื–ืฉื™ื–. ืื™ืฆื˜ ืžื™ืจ ืงืขื ืขืŸ ืึธื ื”ื™ื™ื‘ืŸ ืขื ื˜ืคึฟืขืจืŸ ื•ื•ื™ื›ื˜ื™ืง ืคึฟืจืื’ืŸ ื•ื•ื™ ื•ื•ื™ ืคื™ืœืข ืžืขื ื˜ืฉืŸ ื ื•ืฆืŸ ืื•ื ื“ื–ืขืจ ืคึผืจืึธื“ื•ืงื˜? ืื™ื– ื“ื™ื™ืŸ ื‘ืึทื ื™ืฆืขืจ ื‘ืึทื–ืข ื•ื•ืึทืงืกืŸ ืื™ื‘ืขืจ ืฆื™ื™ึทื˜? ื•ื•ืึธืก ืึทืกืคึผืขืงืฅ ืคื•ืŸ ื“ื™ ืคึผืจืึธื“ื•ืงื˜ ื˜ืึธืŸ ืžืขื ื˜ืฉืŸ ื™ื ื˜ืขืจืึทืงื˜ ืžื™ื˜ ื“ื™ ืžืขืจืกื˜? ืื•ืŸ ื–ืขื ืขืŸ ื“ืึธืจื˜ ืขืจืจืึธืจืก ื•ื•ื• ืขืก ื–ืึธืœ ื ื™ื˜ ื–ื™ื™ืŸ? ื“ืืก ื–ืขื ืขืŸ ื“ื™ ืคืจืื’ืขืก ื•ื•ืึธืก ื•ื•ืขื˜ ื–ื™ื™ืŸ ืคื•ืŸ ืื™ื ื˜ืขืจืขืก ืฆื• ื“ืขืจ ืึธืจื’ืึทื ื™ื–ืึทืฆื™ืข. ื‘ืึทื–ื™ืจื˜ ืื•ื™ืฃ ื“ื™ ื™ื ืกื™ื™ืฅ ื•ื•ืึธืก ืงื•ืžืขืŸ ืคึฟื•ืŸ ื“ื™ ืขื ื˜ืคึฟืขืจืก ืฆื• ื“ื™ ืคึฟืจืื’ืŸ, ืžื™ืจ ืงืขื ืขืŸ ืคึฟืึทืจื‘ืขืกืขืจืŸ ื“ื™ ืคึผืจืึธื“ื•ืงื˜ ืื•ืŸ ืคืึทืจื’ืจืขืกืขืจืŸ ื‘ืึทื ื™ืฆืขืจ ื‘ืึทืฉื˜ืขืœื•ื ื’.

ืฉื˜ืจืึทืœ ืื™ื– ื˜ืึทืงืข ื ื•ืฆื™ืง ืคึฟืึทืจ ื“ืขื ื˜ื™ืคึผ ืคื•ืŸ ื’ืขื ื™ื˜ื•ื ื’ ืื•ืŸ ื”ืื˜ ืึท ื ื•ืžืขืจ ืคื•ืŸ ืื ื“ืขืจืข ื˜ืฉื™ืงืึทื•ื•ืข ื ื•ืฆืŸ ืงืึทืกืขืก. ืคึฟืึทืจ ื‘ื™ื™ึทืฉืคึผื™ืœ, ืื™ืจ ืงืขืŸ ื•ื•ืขืœืŸ ืฆื• ืคื•ื ืึทื ื“ืขืจืงืœื™ื™ึทื‘ืŸ ืœืึทื’ืขืจ ื˜ื™ืงืขืŸ ื“ืึทื˜ืŸ ืื™ืŸ ืคืึทืงื˜ื™ืฉ ืฆื™ื™ื˜ ืื•ืŸ ืžืึทื›ืŸ ื˜ืจื™ื™ื“ื– ื‘ืื–ื™ืจื˜ ืื•ื™ืฃ ืึทื ืึทืœื™ืกื™ืก, ื˜ืึธืžืขืจ ืื™ืจ ื”ืึธื‘ืŸ ืกืขื ืกืขืจ ื“ืึทื˜ืŸ ืคึฟื•ืŸ ื•ื•ืขื”ื™ืงืœืขืก ืื•ืŸ ืื™ืจ ื•ื•ื™ืœืŸ ืฆื• ืจืขื›ืขื ืขืŸ ื“ื™ ื—ืฉื‘ื•ื ื•ืช ืคื•ืŸ ืคืึทืจืงืขืจ ืžื“ืจื’ื”. ืื™ืจ ืงืขืŸ ืื•ื™ืš, ืœืžืฉืœ, ื–ื™ื™ืŸ ืึท ื’ื™ื™ืžื™ื ื’ ืคื™ืจืžืข ื•ื•ืึธืก ืงืึทืœืขืงืฅ ื‘ืึทื ื™ืฆืขืจ ื“ืึทื˜ืŸ ืื•ืŸ ื ื™ืฆื˜ ืขืก ืฆื• ืฉืึทืคึฟืŸ ื“ืึทืฉื‘ืึธืจื“ื– ืฆื• ืฉืคึผื•ืจ ืฉืœื™ืกืœ ืžืขื˜ืจื™ืงืก. ืึธื•ืงื™ื™, ืจื‘ื•ืชื™, ื“ืึธืก ืื™ื– ืึท ื˜ืขืžืข ืคึฟืึทืจ ืืŸ ืื ื“ืขืจ ืคึผืึธืกื˜ืŸ, ื“ืึทื ืงืขืŸ ืคึฟืึทืจ ืœื™ื™ืขื ืขืŸ, ืื•ืŸ ืคึฟืึทืจ ื“ื™ ื•ื•ืืก ื•ื•ื™ืœืŸ ืฆื• ื–ืขืŸ ื“ื™ ืคื•ืœ ืงืึธื“, ืื•ื ื˜ืŸ ืื™ื– ื“ื™ ืœื™ื ืง ืฆื• ืžื™ื™ืŸ ื’ื™ื˜ื”ื•ื‘.

https://github.com/DFoly/User_log_pipeline

ื“ืึธืก ืื™ื– ืึทืœืข. ืœื™ื™ืขื ืขืŸ ื˜ื™ื™ืœ ืื™ื™ืŸ.

ืžืงื•ืจ: www.habr.com

ืœื™ื™ื’ืŸ ืึท ื‘ืึทืžืขืจืงื•ื ื’