เช…เชฎเซ‡ เชธเซเชŸเซเชฐเซ€เชฎ เชกเซ‡เชŸเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช— เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเซ€เช เช›เซ€เช. เชญเชพเช— 2

เช•เซ‡เชฎ เช›เซ‹ เชฌเชงเชพ. เช…เชฎเซ‡ เชฒเซ‡เช–เชจเชพ เช…เช‚เชคเชฟเชฎ เชญเชพเช—เชจเซ‹ เช…เชจเซเชตเชพเชฆ เชถเซ‡เชฐ เช•เชฐเซ€ เชฐเชนเซเชฏเชพ เช›เซ€เช, เชœเซ‡ เช–เชพเชธ เช•เชฐเซ€เชจเซ‡ เช•เซ‹เชฐเซเชธเชจเชพ เชตเชฟเชฆเซเชฏเชพเชฐเซเชฅเซ€เช“ เชฎเชพเชŸเซ‡ เชคเซˆเชฏเชพเชฐ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซเชฏเซ‹ เช›เซ‡. เชกเซ‡เชŸเชพ เชเชจเซเชœเชฟเชจเชฟเชฏเชฐ. เชคเชฎเซ‡ เชชเซเชฐเชฅเชฎ เชญเชพเช— เชตเชพเช‚เชšเซ€ เชถเช•เซ‹ เช›เซ‹ เช…เชนเซ€เช‚.

เชฐเซ€เช…เชฒ-เชŸเชพเช‡เชฎ เชชเชพเช‡เชชเชฒเชพเช‡เชจเซเชธ เชฎเชพเชŸเซ‡ เช…เชชเชพเชšเซ‡ เชฌเซ€เชฎ เช…เชจเซ‡ เชกเซ‡เชŸเชพเชซเซเชฒเซ‹

เช…เชฎเซ‡ เชธเซเชŸเซเชฐเซ€เชฎ เชกเซ‡เชŸเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช— เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเซ€เช เช›เซ€เช. เชญเชพเช— 2

Google Cloud เชธเซ‡เชŸ เช•เชฐเซ€ เชฐเชนเซเชฏเชพเช‚ เช›เซ€เช

เชจเซ‹เช‚เชง: เชฎเซ‡เช‚ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชšเชฒเชพเชตเชตเชพ เช…เชจเซ‡ เช•เชธเซเชŸเชฎ เชฒเซ‹เช— เชกเซ‡เชŸเชพ เชชเซเชฐเช•เชพเชถเชฟเชค เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ Google เช•เซเชฒเชพเช‰เชก เชถเซ‡เชฒเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซเชฏเซ‹ เช›เซ‡ เช•เชพเชฐเชฃ เช•เซ‡ เชฎเชจเซ‡ เชชเชพเชฏเชฅเซ‹เชจ 3 เชฎเชพเช‚ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชšเชฒเชพเชตเชตเชพเชฎเชพเช‚ เชฎเซเชถเซเช•เซ‡เชฒเซ€ เช†เชตเซ€ เชฐเชนเซ€ เชนเชคเซ€. Google เช•เซเชฒเชพเช‰เชก เชถเซ‡เชฒ เชชเชพเชฏเชฅเซ‹เชจ 2 เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ‡ เช›เซ‡, เชœเซ‡ เช…เชชเชพเชšเซ‡ เชฌเซ€เชฎ เชธเชพเชฅเซ‡ เชตเชงเซ เชธเซเชธเช‚เช—เชค เช›เซ‡.

เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชถเชฐเซ‚ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡, เช†เชชเชฃเซ‡ เชธเซ‡เชŸเชฟเช‚เช—เซเชธเชฎเชพเช‚ เชฅเซ‹เชกเซเช‚ เช–เซ‹เชฆเชตเซเช‚ เชชเชกเชถเซ‡. เชคเชฎเชพเชฐเชพเชฎเชพเช‚เชฅเซ€ เชœเซ‡เชฎเชฃเซ‡ เช…เช—เชพเช‰ GCP เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซเชฏเซ‹ เชจเชฅเซ€, เชคเชฎเชพเชฐเซ‡ เช†เชฎเชพเช‚ เชฆเชฐเซเชถเชพเชตเซ‡เชฒ เชจเซ€เชšเซ‡เชจเชพ 6 เชชเช—เชฒเชพเช‚เชจเซ‡ เช…เชจเซเชธเชฐเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เชชเชกเชถเซ‡ เชชเซƒเชทเซเช .

เช† เชชเช›เซ€, เช…เชฎเชพเชฐเซ‡ เช…เชฎเชพเชฐเซ€ เชธเซเช•เซเชฐเชฟเชชเซเชŸเซเชธเชจเซ‡ Google Cloud Storage เชชเชฐ เช…เชชเชฒเซ‹เชก เช•เชฐเชตเชพเชจเซ€ เช…เชจเซ‡ เชคเซ‡เชจเซ‡ เช…เชฎเชพเชฐเชพ Google Cloud Shel เชชเชฐ เช•เซ‰เชชเชฟ เช•เชฐเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เชชเชกเชถเซ‡. เช•เซเชฒเชพเช‰เชก เชธเซเชŸเซ‹เชฐเซ‡เชœ เชชเชฐ เช…เชชเชฒเซ‹เชก เช•เชฐเชตเซเช‚ เชเช•เชฆเชฎ เชคเซเชšเซเช› เช›เซ‡ (เชตเชฐเซเชฃเชจ เชฎเชณเซ€ เชถเช•เซ‡ เช›เซ‡ เช…เชนเซ€เช‚). เช…เชฎเชพเชฐเซ€ เชซเชพเช‡เชฒเซ‹เชจเซ€ เชจเช•เชฒ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡, เช…เชฎเซ‡ เชจเซ€เชšเซ‡ เช†เช•เซƒเชคเชฟ 2 เชฎเชพเช‚ เชกเชพเชฌเซ€ เชฌเชพเชœเซเชจเชพ เชชเซเชฐเชฅเชฎ เช†เช‡เช•เซ‹เชจ เชชเชฐ เช•เซเชฒเชฟเช• เช•เชฐเซ€เชจเซ‡ เชŸเซ‚เชฒเชฌเชพเชฐเชฎเชพเช‚เชฅเซ€ Google Cloud Shel เช–เซ‹เชฒเซ€ เชถเช•เซ€เช เช›เซ€เช.

เช…เชฎเซ‡ เชธเซเชŸเซเชฐเซ€เชฎ เชกเซ‡เชŸเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช— เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเซ€เช เช›เซ€เช. เชญเชพเช— 2
เชšเชฟเชคเซเชฐ 2

เชซเชพเชˆเชฒเซ‹เชจเซ€ เชจเช•เชฒ เช•เชฐเชตเชพ เช…เชจเซ‡ เชœเชฐเซ‚เชฐเซ€ เชฒเชพเชˆเชฌเซเชฐเซ‡เชฐเซ€เช“ เชˆเชจเซเชธเซเชŸเซ‹เชฒ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เช†เชชเชฃเชจเซ‡ เชœเซ‡ เช†เชฆเซ‡เชถเซ‹เชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡ เชคเซ‡ เชจเซ€เชšเซ‡ เชธเซ‚เชšเชฟเชฌเชฆเซเชง เช›เซ‡.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

เช…เชฎเชพเชฐเชพ เชกเซ‡เชŸเชพเชฌเซ‡เช เช…เชจเซ‡ เชŸเซ‡เชฌเชฒ เชฌเชจเชพเชตเซ€ เชฐเชนเซเชฏเชพ เช›เซ€เช

เชเช•เชตเชพเชฐ เช…เชฎเซ‡ เชธเซ‡เชŸเช…เชช เชธเช‚เชฌเช‚เชงเชฟเชค เชคเชฎเชพเชฎ เชชเช—เชฒเชพเช‚เช“ เชชเซ‚เชฐเซเชฃ เช•เชฐเซ€ เชฒเซ€เชงเชพ เชชเช›เซ€, เช…เชฎเซ‡ เช†เช—เชณเชจเซเช‚ เช•เชพเชฎ BigQuery เชฎเชพเช‚ เชกเซ‡เชŸเชพเชธเซ‡เชŸ เช…เชจเซ‡ เชŸเซ‡เชฌเชฒ เชฌเชจเชพเชตเชตเชพเชจเซ€ เช›เซ‡. เช† เช•เชฐเชตเชพเชจเซ€ เช˜เชฃเซ€ เชฐเซ€เชคเซ‹ เช›เซ‡, เชชเชฐเช‚เชคเซ เชธเซŒเชฅเซ€ เชธเชฐเชณ เช เช›เซ‡ เช•เซ‡ เชชเซเชฐเชฅเชฎ เชกเซ‡เชŸเชพเชธเซ‡เชŸ เชฌเชจเชพเชตเซ€เชจเซ‡ Google Cloud เช•เชจเซเชธเซ‹เชฒเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเชตเซ‹. เชคเชฎเซ‡ เชจเซ€เชšเซ‡เชจเชพ เชชเช—เชฒเชพเช‚เชจเซ‡ เช…เชจเซเชธเชฐเซ€ เชถเช•เซ‹ เช›เซ‹ เช•เชกเซ€เชธเซเช•เซ€เชฎเชพ เชธเชพเชฅเซ‡ เชŸเซ‡เชฌเชฒ เชฌเชจเชพเชตเชตเชพ เชฎเชพเชŸเซ‡. เช…เชฎเชพเชฐเชพ เชŸเซ‡เชฌเชฒ เชนเชถเซ‡ 7 เช•เซ‰เชฒเชฎ, เชฆเชฐเซ‡เช• เชตเชชเชฐเชพเชถเช•เชฐเซเชคเชพ เชฒเซ‹เช—เชจเชพ เช˜เชŸเช•เซ‹เชจเซ‡ เช…เชจเซเชฐเซ‚เชช. เชธเช—เชตเชก เชฎเชพเชŸเซ‡, เช…เชฎเซ‡ เชŸเชพเชˆเชฎเชฒเซ‹เช•เชฒ เชตเซ‡เชฐเซ€เชเชฌเชฒ เชธเชฟเชตเชพเชฏเชจเชพ เชคเชฎเชพเชฎ เช•เซ‰เชฒเชฎเซเชธเชจเซ‡ เชธเซเชŸเซเชฐเชฟเช‚เช— เชคเชฐเซ€เช•เซ‡ เชตเซเชฏเชพเช–เซเชฏเชพเชฏเชฟเชค เช•เชฐเซ€เชถเซเช‚ เช…เชจเซ‡ เช…เชฎเซ‡ เช…เช—เชพเช‰ เชœเชจเชฐเซ‡เชŸ เช•เชฐเซ‡เชฒเชพ เชตเซ‡เชฐเชฟเชฏเซ‡เชฌเชฒเซเชธ เช…เชจเซเชธเชพเชฐ เชคเซ‡เชฎเชจเซ‡ เชจเชพเชฎ เช†เชชเซ€เชถเซเช‚. เช…เชฎเชพเชฐเชพ เช•เซ‹เชทเซเชŸเช•เชจเซเช‚ เชฒเซ‡เช†เช‰เชŸ เช†เช•เซƒเชคเชฟ 3 เชœเซ‡เชตเซเช‚ เชนเซ‹เชตเซเช‚ เชœเซ‹เชˆเช.

เช…เชฎเซ‡ เชธเซเชŸเซเชฐเซ€เชฎ เชกเซ‡เชŸเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช— เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเซ€เช เช›เซ€เช. เชญเชพเช— 2
เช†เช•เซƒเชคเชฟ 3. เชŸเซ‡เชฌเชฒ เชฒเซ‡เช†เช‰เชŸ

เชตเชชเชฐเชพเชถเช•เชฐเซเชคเชพ เชฒเซ‹เช— เชกเซ‡เชŸเชพ เชชเซเชฐเช•เชพเชถเชฟเชค เช•เชฐเซ€ เชฐเชนเซเชฏเซเช‚ เช›เซ‡

เชชเชฌ/เชธเชฌ เช เช…เชฎเชพเชฐเซ€ เชชเชพเช‡เชชเชฒเชพเช‡เชจเชจเซ‹ เชเช• เชฎเชนเชคเซเชตเชชเซ‚เชฐเซเชฃ เช˜เชŸเช• เช›เซ‡ เช•เชพเชฐเชฃ เช•เซ‡ เชคเซ‡ เชฌเชนเซเชตเชฟเชง เชธเซเชตเชคเช‚เชคเซเชฐ เชเชชเซเชฒเชฟเช•เซ‡เชถเชจเซ‹เชจเซ‡ เชเช•เชฌเซ€เชœเชพ เชธเชพเชฅเซ‡ เชตเชพเชคเชšเซ€เชค เช•เชฐเชตเชพเชจเซ€ เชฎเช‚เชœเซ‚เชฐเซ€ เช†เชชเซ‡ เช›เซ‡. เช–เชพเชธ เช•เชฐเซ€เชจเซ‡, เชคเซ‡ เชฎเชงเซเชฏเชธเซเชฅเซ€ เชคเชฐเซ€เช•เซ‡ เช•เชพเชฎ เช•เชฐเซ‡ เช›เซ‡ เชœเซ‡ เช…เชฎเชจเซ‡ เชเชชเซเชฒเชฟเช•เซ‡เชถเชจเซ‹ เชตเชšเซเชšเซ‡ เชธเช‚เชฆเซ‡เชถเชพ เชฎเซ‹เช•เชฒเชตเชพ เช…เชจเซ‡ เชชเซเชฐเชพเชชเซเชค เช•เชฐเชตเชพเชจเซ€ เชฎเช‚เชœเซ‚เชฐเซ€ เช†เชชเซ‡ เช›เซ‡. เชชเซเชฐเชฅเชฎ เชตเชธเซเชคเซ เชœเซ‡ เช†เชชเชฃเซ‡ เช•เชฐเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡ เชคเซ‡ เชเช• เชตเชฟเชทเชฏ เชฌเชจเชพเชตเชตเชพเชจเซ€ เช›เซ‡. เช•เชจเซเชธเซ‹เชฒเชฎเชพเช‚ เชซเช•เซเชค Pub/Sub เชชเชฐ เชœเชพเช“ เช…เชจเซ‡ CREATE TOPIC เชชเชฐ เช•เซเชฒเชฟเช• เช•เชฐเซ‹.

เชจเซ€เชšเซ‡เชจเซ‹ เช•เซ‹เชก เช‰เชชเชฐ เชตเซเชฏเชพเช–เซเชฏเชพเชฏเชฟเชค เชฒเซ‹เช— เชกเซ‡เชŸเชพ เชœเชจเชฐเซ‡เชŸ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เช…เชฎเชพเชฐเซ€ เชธเซเช•เซเชฐเชฟเชชเซเชŸเชจเซ‡ เช•เซ‰เชฒ เช•เชฐเซ‡ เช›เซ‡ เช…เชจเซ‡ เชชเช›เซ€ เชฒเซ‹เช—เชจเซ‡ Pub/Sub เชชเชฐ เชœเซ‹เชกเซ‡ เช›เซ‡ เช…เชจเซ‡ เชฎเซ‹เช•เชฒเซ‡ เช›เซ‡. เช†เชชเชฃเซ‡ เชฎเชพเชคเซเชฐ เชเช• เชตเชธเซเชคเซ เชฌเชจเชพเชตเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡ เชชเซเชฐเช•เชพเชถเช• เช•เซเชฒเชพเชฏเชจเซเชŸ, เชชเชฆเซเชงเชคเชฟเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชตเชฟเชทเชฏเชจเซ‹ เชฎเชพเชฐเซเช— เชธเซเชชเชทเซเชŸ เช•เชฐเซ‹ topic_path เช…เชจเซ‡ เชซเช‚เช•เซเชถเชจเชจเซ‡ เช•เซ‰เชฒ เช•เชฐเซ‹ publish ั topic_path เช…เชจเซ‡ เชกเซ‡เชŸเชพ. เชฎเชนเซ‡เชฐเชฌเชพเชจเซ€ เช•เชฐเซ€เชจเซ‡ เชจเซ‹เช‚เชง เช•เชฐเซ‹ เช•เซ‡ เช…เชฎเซ‡ เช†เชฏเชพเชค เช•เชฐเซ€เช เช›เซ€เช generate_log_line เช…เชฎเชพเชฐเซ€ เชธเซเช•เซเชฐเชฟเชชเซเชŸเชฎเชพเช‚เชฅเซ€ stream_logs, เชคเซ‡เชฅเซ€ เช–เชพเชคเชฐเซ€ เช•เชฐเซ‹ เช•เซ‡ เช† เชซเชพเช‡เชฒเซ‹ เชธเชฎเชพเชจ เชซเซ‹เชฒเซเชกเชฐเชฎเชพเช‚ เช›เซ‡, เช…เชจเซเชฏเชฅเชพ เชคเชฎเชจเซ‡ เช†เชฏเชพเชค เชญเซ‚เชฒ เชฎเชณเชถเซ‡. เชชเช›เซ€ เช…เชฎเซ‡ เช†เชจเซ‡ เช…เชฎเชพเชฐเชพ เช—เซ‚เช—เชฒ เช•เชจเซเชธเซ‹เชฒ เชฆเซเชตเชพเชฐเชพ เช†เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชšเชฒเชพเชตเซ€ เชถเช•เซ€เช เช›เซ€เช:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

เชœเชฒเชฆเซ€ เชซเชพเช‡เชฒ เชšเชพเชฒเซ‡ เช›เซ‡, เช…เชฎเซ‡ เชจเซ€เชšเซ‡เชจเซ€ เช†เช•เซƒเชคเชฟเชฎเชพเช‚ เชฌเชคเชพเชตเซเชฏเชพ เชชเซเชฐเชฎเชพเชฃเซ‡, เช•เชจเซเชธเซ‹เชฒ เชชเชฐ เชฒเซ‹เช— เชกเซ‡เชŸเชพเชจเซเช‚ เช†เช‰เชŸเชชเซเชŸ เชœเซ‹เชˆ เชถเช•เซ€เชถเซเช‚. เชœเซเชฏเชพเช‚ เชธเซเชงเซ€ เช…เชฎเซ‡ เช‰เชชเชฏเซ‹เช— เชจเชนเซ€เช‚ เช•เชฐเซ€เช เชคเซเชฏเชพเช‚ เชธเซเชงเซ€ เช† เชธเซเช•เซเชฐเชฟเชชเซเชŸ เช•เชพเชฎ เช•เชฐเชถเซ‡ เชธเซ€เชŸเซ€เช†เชฐเชเชฒ + เชธเซ€เชคเซ‡เชจเซ‡ เชชเซ‚เชฐเซเชฃ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡.

เช…เชฎเซ‡ เชธเซเชŸเซเชฐเซ€เชฎ เชกเซ‡เชŸเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช— เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเซ€เช เช›เซ€เช. เชญเชพเช— 2
เช†เช•เซƒเชคเชฟ 4. เช†เช‰เชŸเชชเซเชŸ publish_logs.py

เช…เชฎเชพเชฐเชพ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เช•เซ‹เชก เชฒเช–เซ€ เชฐเชนเซเชฏเชพ เช›เซ€เช

เชนเชตเซ‡ เชœเซเชฏเชพเชฐเซ‡ เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชฌเชงเซเช‚ เชคเซˆเชฏเชพเชฐ เช›เซ‡, เช…เชฎเซ‡ เชฎเชœเชพเชจเซ‹ เชญเชพเช— เชถเชฐเซ‚ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช - เชฌเซ€เชฎ เช…เชจเซ‡ เชชเชพเชฏเชฅเซ‹เชจเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เช…เชฎเชพเชฐเซ€ เชชเชพเช‡เชชเชฒเชพเช‡เชจเชจเซเช‚ เช•เซ‹เชกเชฟเช‚เช—. เชฌเซ€เชฎ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเชตเชพ เชฎเชพเชŸเซ‡, เช†เชชเชฃเซ‡ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เช‘เชฌเซเชœเซ‡เช•เซเชŸ (p) เชฌเชจเชพเชตเชตเชพเชจเซ€ เชœเชฐเซ‚เชฐ เช›เซ‡. เชเช•เชตเชพเชฐ เช…เชฎเซ‡ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เช‘เชฌเซเชœเซ‡เช•เซเชŸ เชฌเชจเชพเชตเซ€ เชฒเซ€เชงเชพ เชชเช›เซ€, เช…เชฎเซ‡ เช‘เชชเชฐเซ‡เชŸเชฐเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชเช• เชชเช›เซ€ เชเช• เชฌเชนเซเชตเชฟเชง เช•เชพเชฐเซเชฏเซ‹ เชฒเชพเช—เซ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช pipe (|). เชธเชพเชฎเชพเชจเซเชฏ เชฐเซ€เชคเซ‡, เชตเชฐเซเช•เชซเซเชฒเซ‹ เชจเซ€เชšเซ‡เชจเซ€ เช›เชฌเซ€ เชœเซ‡เชตเซ‹ เชฆเซ‡เช–เชพเชฏ เช›เซ‡.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

เช…เชฎเชพเชฐเชพ เช•เซ‹เชกเชฎเชพเช‚, เช…เชฎเซ‡ เชฌเซ‡ เช•เชธเซเชŸเชฎ เชซเช‚เช•เซเชถเชจ เชฌเชจเชพเชตเซ€เชถเซเช‚. เช•เชพเชฐเซเชฏ regex_cleanเชœเซ‡ เชกเซ‡เชŸเชพเชจเซ‡ เชธเซเช•เซ‡เชจ เช•เชฐเซ‡ เช›เซ‡ เช…เชจเซ‡ เชซเช‚เช•เซเชถเชจเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ PATTERNS เชฏเชพเชฆเซ€เชจเชพ เช†เชงเชพเชฐเซ‡ เช…เชจเซเชฐเซ‚เชช เชชเช‚เช•เซเชคเชฟเชจเซ‡ เชชเซเชจเชƒเชชเซเชฐเชพเชชเซเชค เช•เชฐเซ‡ เช›เซ‡ re.search. เชซเช‚เช•เซเชถเชจ เช…เชฒเซเชชเชตเชฟเชฐเชพเชฎเชฅเซ€ เช…เชฒเช— เช•เชฐเซ‡เชฒเซ€ เชธเซเชŸเซเชฐเชฟเช‚เช— เชชเชฐเชค เช•เชฐเซ‡ เช›เซ‡. เชœเซ‹ เชคเชฎเซ‡ เชจเชฟเชฏเชฎเชฟเชค เช…เชญเชฟเชตเซเชฏเช•เซเชคเชฟ เชจเชฟเชทเซเชฃเชพเชค เชจเชฅเซ€, เชคเซ‹ เชนเซเช‚ เช† เชคเชชเชพเชธเชตเชพเชจเซ€ เชญเชฒเชพเชฎเชฃ เช•เชฐเซเช‚ เช›เซเช‚ เชŸเซเชฏเซเชŸเซ‹เชฐเซ€เชฏเชฒ เช…เชจเซ‡ เช•เซ‹เชก เชคเชชเชพเชธเชตเชพ เชฎเชพเชŸเซ‡ เชจเซ‹เชŸเชชเซ‡เชกเชฎเชพเช‚ เชชเซเชฐเซ‡เช•เซเชŸเชฟเชธ เช•เชฐเซ‹. เช† เชชเช›เซ€ เช…เชฎเซ‡ เช•เชธเซเชŸเชฎ ParDo เชซเช‚เช•เซเชถเชจเชจเซ‡ เชตเซเชฏเชพเช–เซเชฏเชพเชฏเชฟเชค เช•เชฐเซ€เช เช›เซ€เช เชœเซ‡เชจเซ‡ เช•เชนเซ‡เชตเชพเชฏ เช›เซ‡ เชธเซเชชเซเชฒเชฟเชŸ, เชœเซ‡ เชธเชฎเชพเช‚เชคเชฐ เชชเซเชฐเช•เซเชฐเชฟเชฏเชพ เชฎเชพเชŸเซ‡ เชฌเซ€เชฎ เชŸเซเชฐเชพเชจเซเชธเชซเซ‹เชฐเซเชฎเชจเซ€ เชตเชฟเชตเชฟเชงเชคเชพ เช›เซ‡. เชชเชพเชฏเชฅเซ‹เชจเชฎเชพเช‚, เช† เชเช• เชตเชฟเชถเชฟเชทเซเชŸ เชฐเซ€เชคเซ‡ เช•เชฐเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡ - เช†เชชเชฃเซ‡ เชเช• เชตเชฐเซเช— เชฌเชจเชพเชตเชตเซ‹ เชœเซ‹เชˆเช เชœเซ‡ DoFn เชฌเซ€เชฎ เชตเชฐเซเช—เชฎเชพเช‚เชฅเซ€ เชตเชพเชฐเชธเชพเชฎเชพเช‚ เชฎเซ‡เชณเชตเซ‡ เช›เซ‡. เชธเซเชชเซเชฒเชฟเชŸ เชซเช‚เช•เซเชถเชจ เชชเชพเช›เชฒเชพ เชซเช‚เช•เซเชถเชจเชฎเชพเช‚เชฅเซ€ เชตเชฟเชถเซเชฒเซ‡เชทเชฟเชค เชชเช‚เช•เซเชคเชฟ เชฒเซ‡ เช›เซ‡ เช…เชจเซ‡ เช…เชฎเชพเชฐเชพ BigQuery เช•เซ‹เชทเซเชŸเช•เชฎเชพเช‚ เช•เซ‰เชฒเชฎเชจเชพ เชจเชพเชฎเซ‹เชจเซ‡ เช…เชจเซเชฐเซ‚เชช เช•เซ€ เชธเชพเชฅเซ‡ เชถเชฌเซเชฆเช•เซ‹เชถเซ‹เชจเซ€ เชธเซ‚เชšเชฟ เช†เชชเซ‡ เช›เซ‡. เช† เชซเช‚เช•เซเชถเชจ เชตเชฟเชถเซ‡ เชจเซ‹เช‚เชงเชตเชพ เชœเซ‡เชตเซเช‚ เช•เช‚เชˆเช• เช›เซ‡: เชฎเชพเชฐเซ‡ เช†เชฏเชพเชค เช•เชฐเชตเซเช‚ เชชเชกเซเชฏเซเช‚ datetime เชคเซ‡เชจเซ‡ เช•เชพเชฐเซเชฏ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เช•เชพเชฐเซเชฏเชจเซ€ เช…เช‚เชฆเชฐ. เชฎเชจเซ‡ เชซเชพเช‡เชฒเชจเซ€ เชถเชฐเซ‚เช†เชคเชฎเชพเช‚ เชเช• เช†เชฏเชพเชค เชญเซ‚เชฒ เชฎเชณเซ€ เชฐเชนเซ€ เชนเชคเซ€, เชœเซ‡ เชตเชฟเชšเชฟเชคเซเชฐ เชนเชคเซ€. เช† เชฏเชพเชฆเซ€ เชชเช›เซ€ เชซเช‚เช•เซเชถเชจเชฎเชพเช‚ เชฎเซ‹เช•เชฒเชตเชพเชฎเชพเช‚ เช†เชตเซ‡ เช›เซ‡ เชฌเชฟเช—เช•เซเชตเซ‡เชฐเซ€ เชฒเช–เซ‹, เชœเซ‡ เชซเช•เซเชค เชŸเซ‡เชฌเชฒเชฎเชพเช‚ เช†เชชเชฃเซ‹ เชกเซ‡เชŸเชพ เช‰เชฎเซ‡เชฐเซ‡ เช›เซ‡. เชฌเซ‡เชš เชกเซ‡เชŸเชพเชซเซเชฒเซ‹ เชœเซ‹เชฌ เช…เชจเซ‡ เชธเซเชŸเซเชฐเซ€เชฎเชฟเช‚เช— เชกเซ‡เชŸเชพเชซเซเชฒเซ‹ เชœเซ‹เชฌ เชฎเชพเชŸเซ‡เชจเซ‹ เช•เซ‹เชก เชจเซ€เชšเซ‡ เช†เชชเซ‡เชฒ เช›เซ‡. เชฌเซ‡เชš เช…เชจเซ‡ เชธเซเชŸเซเชฐเซ€เชฎเชฟเช‚เช— เช•เซ‹เชก เชตเชšเซเชšเซ‡เชจเซ‹ เชคเชซเชพเชตเชค เชเชŸเชฒเซ‹ เชœ เช›เซ‡ เช•เซ‡ เชฌเซ‡เชšเชฎเชพเช‚ เช†เชชเชฃเซ‡ CSV เชตเชพเช‚เชšเซ€เช เช›เซ€เช src_pathเชซเช‚เช•เซเชถเชจเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ ReadFromText เชฌเซ€เชฎ เชฎเชพเช‚เชฅเซ€.

เชฌเซ‡เชš เชกเซ‡เชŸเชพเชซเซเชฒเซ‹ เชœเซ‹เชฌ (เชฌเซ‡เชš เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช—)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

เชธเซเชŸเซเชฐเซ€เชฎเชฟเช‚เช— เชกเซ‡เชŸเชพเชซเซเชฒเซ‹ เชœเซ‹เชฌ (เชธเซเชŸเซเชฐเซ€เชฎ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช—)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

เช•เชจเซเชตเซ‡เชฏเชฐ เชถเชฐเซ‚ เช•เชฐเซ€ เชฐเชนเซเชฏเชพ เช›เซ€เช

เช…เชฎเซ‡ เชชเชพเช‡เชชเชฒเชพเช‡เชจเชจเซ‡ เชตเชฟเชตเชฟเชง เชฐเซ€เชคเซ‡ เชšเชฒเชพเชตเซ€ เชถเช•เซ€เช เช›เซ€เช. เชœเซ‹ เช…เชฎเซ‡ เช‡เชšเซเช›เซ€เช เชคเซ‹, GCP เชฎเชพเช‚ เชฆเซ‚เชฐเชธเซเชฅ เชฐเซ€เชคเซ‡ เชฒเซ‹เช— เช‡เชจ เช•เชฐเชคเซ€ เชตเช–เชคเซ‡ เช…เชฎเซ‡ เชคเซ‡เชจเซ‡ เชŸเชฐเซเชฎเชฟเชจเชฒเชฅเซ€ เชธเซเชฅเชพเชจเชฟเช• เชฐเซ€เชคเซ‡ เชšเชฒเชพเชตเซ€ เชถเช•เซ€เช เช›เซ€เช.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

เชœเซ‹ เช•เซ‡, เช…เชฎเซ‡ เชกเซ‡เชŸเชพเชซเซเชฒเซ‹เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เชคเซ‡เชจเซ‡ เชšเชฒเชพเชตเชตเชพ เชœเชˆ เชฐเชนเซเชฏเชพ เช›เซ€เช. เช…เชฎเซ‡ เชจเซ€เชšเซ‡เชจเชพ เชœเชฐเซ‚เชฐเซ€ เชชเชฐเชฟเชฎเชพเชฃเซ‹ เชธเซ‡เชŸ เช•เชฐเซ€เชจเซ‡ เชจเซ€เชšเซ‡เชจเชพ เช†เชฆเซ‡เชถเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€เชจเซ‡ เช† เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช.

  • project โ€” เชคเชฎเชพเชฐเชพ GCP เชชเซเชฐเซ‹เชœเซ‡เช•เซเชŸเชจเซเช‚ ID.
  • runner เชเช• เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฐเชจเชฐ เช›เซ‡ เชœเซ‡ เชคเชฎเชพเชฐเชพ เชชเซเชฐเซ‹เช—เซเชฐเชพเชฎเชจเซเช‚ เชตเชฟเชถเซเชฒเซ‡เชทเชฃ เช•เชฐเชถเซ‡ เช…เชจเซ‡ เชคเชฎเชพเชฐเซ€ เชชเชพเช‡เชชเชฒเชพเช‡เชจเชจเซเช‚ เชจเชฟเชฐเซเชฎเชพเชฃ เช•เชฐเชถเซ‡. เช•เซเชฒเชพเช‰เชกเชฎเชพเช‚ เชšเชฒเชพเชตเชตเชพ เชฎเชพเชŸเซ‡, เชคเชฎเชพเชฐเซ‡ เชกเซ‡เชŸเชพเชซเซเชฒเซ‹เชฐเชจเชฐเชจเซ‹ เช‰เชฒเซเชฒเซ‡เช– เช•เชฐเชตเซ‹ เช†เชตเชถเซเชฏเช• เช›เซ‡.
  • staging_location โ€” เช•เชพเชฐเซเชฏ เช•เชฐเซ€ เชฐเชนเซ‡เชฒเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฐเซ‹ เชฆเซเชตเชพเชฐเชพ เชœเชฐเซ‚เชฐเซ€ เชˆเชจเซเชกเซ‡เช•เซเชธเซ€เช‚เช— เช•เซ‹เชก เชชเซ‡เช•เซ‡เชœเซ‹ เชฎเชพเชŸเซ‡ เช•เซเชฒเชพเช‰เชก เชกเซ‡เชŸเชพเชซเซเชฒเซ‹ เช•เซเชฒเชพเช‰เชก เชธเซเชŸเซ‹เชฐเซ‡เชœเชจเซ‹ เชฎเชพเชฐเซเช—.
  • temp_location - เชœเซเชฏเชพเชฐเซ‡ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชšเชพเชฒเซ€ เชฐเชนเซ€ เชนเซ‹เชฏ เชคเซเชฏเชพเชฐเซ‡ เชฌเชจเชพเชตเซ‡เชฒ เช•เชพเชฎเชšเชฒเชพเช‰ เชœเซ‹เชฌ เชซเชพเช‡เชฒเซ‹เชจเซ‡ เชธเซเชŸเซ‹เชฐ เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เช•เซเชฒเชพเช‰เชก เชกเซ‡เชŸเชพเชซเซเชฒเซ‹ เช•เซเชฒเชพเช‰เชก เชธเซเชŸเซ‹เชฐเซ‡เชœเชจเซ‹ เชชเชพเชฅ.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

เชœเซเชฏเชพเชฐเซ‡ เช† เช†เชฆเซ‡เชถ เชšเชพเชฒเซ€ เชฐเชนเซเชฏเซ‹ เชนเซ‹เชฏ, เชคเซเชฏเชพเชฐเซ‡ เช…เชฎเซ‡ google เช•เชจเซเชธเซ‹เชฒเชฎเชพเช‚ DataFlow เชŸเซ‡เชฌ เชชเชฐ เชœเชˆ เชถเช•เซ€เช เช›เซ€เช เช…เชจเซ‡ เช…เชฎเชพเชฐเซ€ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชœเซ‹เชˆ เชถเช•เซ€เช เช›เซ€เช. เชœเซเชฏเชพเชฐเซ‡ เช†เชชเชฃเซ‡ เชชเชพเชˆเชชเชฒเชพเชˆเชจ เชชเชฐ เช•เซเชฒเชฟเช• เช•เชฐเซ€เช เช›เซ€เช, เชคเซเชฏเชพเชฐเซ‡ เช†เชชเชฃเซ‡ เช†เช•เซƒเชคเชฟ 4 เชœเซ‡เชตเซเช‚ เช•เช‚เชˆเช• เชœเซ‹เชตเซเช‚ เชœเซ‹เชˆเช. เชกเซ€เชฌเช—เซ€เช‚เช— เชนเซ‡เชคเซเช“ เชฎเชพเชŸเซ‡, เชตเชฟเช—เชคเชตเชพเชฐ เชฒเซ‹เช— เชœเซ‹เชตเชพ เชฎเชพเชŸเซ‡ เชฒเซ‹เช—เซเชธ เช…เชจเซ‡ เชชเช›เซ€ เชธเซเชŸเซ‡เช•เชกเซเชฐเชพเชˆเชตเชฐ เชชเชฐ เชœเชตเชพเชจเซเช‚ เช–เซ‚เชฌ เชœ เชฎเชฆเชฆเชฐเซ‚เชช เชฅเชˆ เชถเช•เซ‡ เช›เซ‡. เช†เชจเชพเชฅเซ€ เชฎเชจเซ‡ เช…เชธเช‚เช–เซเชฏ เช•เซ‡เชธเซ‹เชฎเชพเช‚ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชธเชฎเชธเซเชฏเชพเช“ เช‰เช•เซ‡เชฒเชตเชพเชฎเชพเช‚ เชฎเชฆเชฆ เชฎเชณเซ€ เช›เซ‡.

เช…เชฎเซ‡ เชธเซเชŸเซเชฐเซ€เชฎ เชกเซ‡เชŸเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช— เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเซ€เช เช›เซ€เช. เชญเชพเช— 2
เช†เช•เซƒเชคเชฟ 4: เชฌเซ€เชฎ เช•เชจเซเชตเซ‡เชฏเชฐ

BigQuery เชฎเชพเช‚ เช…เชฎเชพเชฐเซ‹ เชกเซ‡เชŸเชพ เชเช•เซเชธเซ‡เชธ เช•เชฐเซ‹

เชคเซ‡เชฅเซ€, เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชชเชนเซ‡เชฒเชพเชฅเซ€ เชœ เชเช• เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชนเซ‹เชตเซ€ เชœเซ‹เชˆเช เชœเซ‡เชฎเชพเช‚ เช…เชฎเชพเชฐเชพ เช•เซ‹เชทเซเชŸเช•เชฎเชพเช‚ เชกเซ‡เชŸเชพ เชตเชนเซ‡เชคเซ‹ เชนเซ‹เชฏ. เช†เชจเซ‡ เชšเช•เชพเชธเชตเชพ เชฎเชพเชŸเซ‡, เช…เชฎเซ‡ BigQuery เชชเชฐ เชœเชˆเชจเซ‡ เชกเซ‡เชŸเชพ เชœเซ‹เชˆ เชถเช•เซ€เช เช›เซ€เช. เชจเซ€เชšเซ‡เชจเชพ เช†เชฆเซ‡เชถเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซเชฏเชพ เชชเช›เซ€ เชคเชฎเชพเชฐเซ‡ เชกเซ‡เชŸเชพเชธเซ‡เชŸเชจเซ€ เชชเซเชฐเชฅเชฎ เช•เซ‡เชŸเชฒเซ€เช• เชชเช‚เช•เซเชคเชฟเช“ เชœเซ‹เชตเซ€ เชœเซ‹เชˆเช. เชนเชตเซ‡ เชœเซเชฏเชพเชฐเซ‡ เช…เชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ BigQuery เชฎเชพเช‚ เชกเซ‡เชŸเชพ เชธเช‚เช—เซเชฐเชนเชฟเชค เช›เซ‡, เชคเซเชฏเชพเชฐเซ‡ เช…เชฎเซ‡ เชตเชงเซ เชตเชฟเชถเซเชฒเซ‡เชทเชฃ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช, เชคเซ‡เชฎเชœ เชธเชพเชฅเซ€เชฆเชพเชฐเซ‹ เชธเชพเชฅเซ‡ เชกเซ‡เชŸเชพ เชถเซ‡เชฐ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช เช…เชจเซ‡ เชตเซเชฏเชตเชธเชพเชฏเชฟเช• เชชเซเชฐเชถเซเชจเซ‹เชจเชพ เชœเชตเชพเชฌ เช†เชชเชตเชพเชจเซเช‚ เชถเชฐเซ‚ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

เช…เชฎเซ‡ เชธเซเชŸเซเชฐเซ€เชฎ เชกเซ‡เชŸเชพ เชชเซเชฐเซ‹เชธเซ‡เชธเชฟเช‚เช— เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเซ€เช เช›เซ€เช. เชญเชพเช— 2
เช†เช•เซƒเชคเชฟ 5: BigQuery

เชจเชฟเชทเซเช•เชฐเซเชท

เช…เชฎเซ‡ เช†เชถเชพ เชฐเชพเช–เซ€เช เช›เซ€เช เช•เซ‡ เช† เชชเซ‹เชธเซเชŸ เชธเซเชŸเซเชฐเซ€เชฎเชฟเช‚เช— เชกเซ‡เชŸเชพ เชชเชพเช‡เชชเชฒเชพเช‡เชจ เชฌเชจเชพเชตเชตเชพ เชคเซ‡เชฎเชœ เชกเซ‡เชŸเชพเชจเซ‡ เชตเชงเซ เชธเซเชฒเชญ เชฌเชจเชพเชตเชตเชพเชจเชพ เชฎเชพเชฐเซเช—เซ‹ เชถเซ‹เชงเชตเชพเชจเชพ เชเช• เช‰เชชเชฏเซ‹เช—เซ€ เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡ เชธเซ‡เชตเชพ เช†เชชเชถเซ‡. เช† เชซเซ‹เชฐเซเชฎเซ‡เชŸเชฎเชพเช‚ เชกเซ‡เชŸเชพ เชธเซเชŸเซ‹เชฐ เช•เชฐเชตเชพเชฅเซ€ เช…เชฎเชจเซ‡ เช˜เชฃเชพ เชซเชพเชฏเชฆเชพ เชฎเชณเซ‡ เช›เซ‡. เชนเชตเซ‡ เช†เชชเชฃเซ‡ เชฎเชนเชคเซเชตเชชเซ‚เชฐเซเชฃ เชชเซเชฐเชถเซเชจเซ‹เชจเชพ เชœเชตเชพเชฌ เช†เชชเชตเชพเชจเซเช‚ เชถเชฐเซ‚ เช•เชฐเซ€ เชถเช•เซ€เช เช›เซ€เช เชœเซ‡เชฎ เช•เซ‡ เช•เซ‡เชŸเชฒเชพ เชฒเซ‹เช•เซ‹ เช…เชฎเชพเชฐเซ€ เชชเซเชฐเซ‹เชกเช•เซเชŸเชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ€ เชฐเชนเซเชฏเชพ เช›เซ‡? เชถเซเช‚ เชคเชฎเชพเชฐเซ‹ เชตเชชเชฐเชพเชถเช•เชฐเซเชคเชพ เช†เชงเชพเชฐ เชธเชฎเชฏ เชธเชพเชฅเซ‡ เชตเชงเซ€ เชฐเชนเซเชฏเซ‹ เช›เซ‡? เชฒเซ‹เช•เซ‹ เช‰เชคเซเชชเชพเชฆเชจเชจเชพ เช•เชฏเชพ เชชเชพเชธเชพเช“ เชธเชพเชฅเซ‡ เชธเซŒเชฅเซ€ เชตเชงเซ เช•เซเชฐเชฟเชฏเชพเชชเซเชฐเชคเชฟเช•เซเชฐเชฟเชฏเชพ เช•เชฐเซ‡ เช›เซ‡? เช…เชจเซ‡ เชถเซเช‚ เชเชตเซ€ เชญเซ‚เชฒเซ‹ เช›เซ‡ เชœเซเชฏเชพเช‚ เชจ เชนเซ‹เชตเซ€ เชœเซ‹เชˆเช? เช† เชเชตเชพ เชชเซเชฐเชถเซเชจเซ‹ เช›เซ‡ เชœเซ‡ เชธเช‚เชธเซเชฅเชพเชจเชพ เชนเชฟเชคเชจเชพ เชฐเชนเซ‡เชถเซ‡. เช† เชชเซเชฐเชถเซเชจเซ‹เชจเชพ เชœเชตเชพเชฌเซ‹เชฎเชพเช‚เชฅเซ€ เช‰เชฆเซเชฆเชญเชตเชคเซ€ เช†เช‚เชคเชฐเชฆเซƒเชทเซเชŸเชฟเชจเชพ เช†เชงเชพเชฐเซ‡, เช…เชฎเซ‡ เช‰เชคเซเชชเชพเชฆเชจเชจเซ‡ เชธเซเชงเชพเชฐเซ€ เชถเช•เซ€เช เช›เซ€เช เช…เชจเซ‡ เชตเชชเชฐเชพเชถเช•เชฐเซเชคเชพเชจเซ€ เชธเช—เชพเชˆ เชตเชงเชพเชฐเซ€ เชถเช•เซ€เช เช›เซ€เช.

เชฌเซ€เชฎ เช† เชชเซเชฐเช•เชพเชฐเชจเซ€ เช•เชธเชฐเชค เชฎเชพเชŸเซ‡ เช–เชฐเซ‡เช–เชฐ เช‰เชชเชฏเซ‹เช—เซ€ เช›เซ‡ เช…เชจเซ‡ เชคเซ‡เชจเชพ เช‰เชชเชฏเซ‹เช—เชจเชพ เช…เชจเซเชฏ เชฐเชธเชชเซเชฐเชฆ เช•เชฟเชธเซเชธเชพเช“ เชชเชฃ เช›เซ‡. เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชคเชฎเซ‡ เชฐเซ€เช…เชฒ เชŸเชพเช‡เชฎเชฎเชพเช‚ เชธเซเชŸเซ‹เช• เชŸเชฟเช• เชกเซ‡เชŸเชพเชจเซเช‚ เชตเชฟเชถเซเชฒเซ‡เชทเชฃ เช•เชฐเชตเชพ เช…เชจเซ‡ เชตเชฟเชถเซเชฒเซ‡เชทเชฃเชจเชพ เช†เชงเชพเชฐเซ‡ เชธเซ‹เชฆเชพ เช•เชฐเชตเชพ เชฎเชพเช—เซ‹ เช›เซ‹, เช•เชฆเชพเชš เชคเชฎเชพเชฐเซ€ เชชเชพเชธเซ‡ เชตเชพเชนเชจเซ‹เชฎเชพเช‚เชฅเซ€ เช†เชตเชคเชพ เชธเซ‡เชจเซเชธเชฐ เชกเซ‡เชŸเชพ เช›เซ‡ เช…เชจเซ‡ เชคเชฎเซ‡ เชŸเซเชฐเชพเชซเชฟเช• เชธเซเชคเชฐเชจเซ€ เช—เชฃเชคเชฐเซ€เชจเซ€ เช—เชฃเชคเชฐเซ€ เช•เชฐเชตเชพ เชฎเชพเช—เซ‹ เช›เซ‹. เชคเชฎเซ‡, เช‰เชฆเชพเชนเชฐเชฃ เชคเชฐเซ€เช•เซ‡, เชเช• เช—เซ‡เชฎเชฟเช‚เช— เช•เช‚เชชเชจเซ€ เชชเชฃ เชฌเชจเซ€ เชถเช•เซ‹ เช›เซ‹ เชœเซ‡ เชตเชชเชฐเชพเชถเช•เชฐเซเชคเชพเชจเซ‹ เชกเซ‡เชŸเชพ เชเช•เชคเซเชฐเชฟเชค เช•เชฐเซ‡ เช›เซ‡ เช…เชจเซ‡ เชฎเซเช–เซเชฏ เชฎเซ‡เชŸเซเชฐเชฟเช•เซเชธเชจเซ‡ เชŸเซเชฐเซ…เช• เช•เชฐเชตเชพ เชฎเชพเชŸเซ‡ เชกเซ‡เชถเชฌเซ‹เชฐเซเชกเซเชธ เชฌเชจเชพเชตเชตเชพ เชฎเชพเชŸเซ‡ เชคเซ‡เชจเซ‹ เช‰เชชเชฏเซ‹เช— เช•เชฐเซ‡ เช›เซ‡. เช เซ€เช• เช›เซ‡, เชธเชœเซเชœเชจเซ‹, เช† เชฌเซ€เชœเซ€ เชชเซ‹เชธเซเชŸ เชฎเชพเชŸเซ‡เชจเซ‹ เชตเชฟเชทเชฏ เช›เซ‡, เชตเชพเช‚เชšเชตเชพ เชฌเชฆเชฒ เช†เชญเชพเชฐ, เช…เชจเซ‡ เชœเซ‡เช“ เชธเช‚เชชเซ‚เชฐเซเชฃ เช•เซ‹เชก เชœเซ‹เชตเชพ เชฎเชพเช—เซ‡ เช›เซ‡ เชคเซ‡เชฎเชจเชพ เชฎเชพเชŸเซ‡ เชจเซ€เชšเซ‡ เชฎเชพเชฐเชพ GitHub เชจเซ€ เชฒเชฟเช‚เช• เช›เซ‡.

https://github.com/DFoly/User_log_pipeline

เชฌเชธ. เชญเชพเช— เชเช• เชตเชพเช‚เชšเซ‹.

เชธเซ‹เชฐเซเชธ: www.habr.com

เชเช• เชŸเชฟเชชเซเชชเชฃเซ€ เช‰เชฎเซ‡เชฐเซ‹