เชเซเชฎ เชเซ เชฌเชงเชพ. เช
เชฎเซ เชฒเซเชเชจเชพ เช
เชเชคเชฟเชฎ เชญเชพเชเชจเซ เช
เชจเซเชตเชพเชฆ เชถเซเชฐ เชเชฐเซ เชฐเชนเซเชฏเชพ เชเซเช, เชเซ เชเชพเชธ เชเชฐเซเชจเซ เชเซเชฐเซเชธเชจเชพ เชตเชฟเชฆเซเชฏเชพเชฐเซเชฅเซเช เชฎเชพเชเซ เชคเซเชฏเชพเชฐ เชเชฐเชตเชพเชฎเชพเช เชเชตเซเชฏเซ เชเซ.
เชฐเซเช เชฒ-เชเชพเชเชฎ เชชเชพเชเชชเชฒเชพเชเชจเซเชธ เชฎเชพเชเซ เช เชชเชพเชเซ เชฌเซเชฎ เช เชจเซ เชกเซเชเชพเชซเซเชฒเซ
Google Cloud เชธเซเช เชเชฐเซ เชฐเชนเซเชฏเชพเช เชเซเช
เชจเซเชเชง: เชฎเซเช เชชเชพเชเชชเชฒเชพเชเชจ เชเชฒเชพเชตเชตเชพ เช เชจเซ เชเชธเซเชเชฎ เชฒเซเช เชกเซเชเชพ เชชเซเชฐเชเชพเชถเชฟเชค เชเชฐเชตเชพ เชฎเชพเชเซ Google เชเซเชฒเชพเชเชก เชถเซเชฒเชจเซ เชเชชเชฏเซเช เชเชฐเซเชฏเซ เชเซ เชเชพเชฐเชฃ เชเซ เชฎเชจเซ เชชเชพเชฏเชฅเซเชจ 3 เชฎเชพเช เชชเชพเชเชชเชฒเชพเชเชจ เชเชฒเชพเชตเชตเชพเชฎเชพเช เชฎเซเชถเซเชเซเชฒเซ เชเชตเซ เชฐเชนเซ เชนเชคเซ. Google เชเซเชฒเชพเชเชก เชถเซเชฒ เชชเชพเชฏเชฅเซเชจ 2 เชจเซ เชเชชเชฏเซเช เชเชฐเซ เชเซ, เชเซ เช เชชเชพเชเซ เชฌเซเชฎ เชธเชพเชฅเซ เชตเชงเซ เชธเซเชธเชเชเชค เชเซ.
เชชเชพเชเชชเชฒเชพเชเชจ เชถเชฐเซ เชเชฐเชตเชพ เชฎเชพเชเซ, เชเชชเชฃเซ เชธเซเชเชฟเชเชเซเชธเชฎเชพเช เชฅเซเชกเซเช เชเซเชฆเชตเซเช เชชเชกเชถเซ. เชคเชฎเชพเชฐเชพเชฎเชพเชเชฅเซ เชเซเชฎเชฃเซ เช
เชเชพเช GCP เชจเซ เชเชชเชฏเซเช เชเชฐเซเชฏเซ เชจเชฅเซ, เชคเชฎเชพเชฐเซ เชเชฎเชพเช เชฆเชฐเซเชถเชพเชตเซเชฒ เชจเซเชเซเชจเชพ 6 เชชเชเชฒเชพเชเชจเซ เช
เชจเซเชธเชฐเชตเชพเชจเซ เชเชฐเซเชฐ เชชเชกเชถเซ
เช เชชเชเซ, เช
เชฎเชพเชฐเซ เช
เชฎเชพเชฐเซ เชธเซเชเซเชฐเชฟเชชเซเชเซเชธเชจเซ Google Cloud Storage เชชเชฐ เช
เชชเชฒเซเชก เชเชฐเชตเชพเชจเซ เช
เชจเซ เชคเซเชจเซ เช
เชฎเชพเชฐเชพ Google Cloud Shel เชชเชฐ เชเซเชชเชฟ เชเชฐเชตเชพเชจเซ เชเชฐเซเชฐ เชชเชกเชถเซ. เชเซเชฒเชพเชเชก เชธเซเชเซเชฐเซเช เชชเชฐ เช
เชชเชฒเซเชก เชเชฐเชตเซเช เชเชเชฆเชฎ เชคเซเชเซเช เชเซ (เชตเชฐเซเชฃเชจ เชฎเชณเซ เชถเชเซ เชเซ
เชเชฟเชคเซเชฐ 2
เชซเชพเชเชฒเซเชจเซ เชจเชเชฒ เชเชฐเชตเชพ เช เชจเซ เชเชฐเซเชฐเซ เชฒเชพเชเชฌเซเชฐเซเชฐเซเช เชเชจเซเชธเซเชเซเชฒ เชเชฐเชตเชพ เชฎเชพเชเซ เชเชชเชฃเชจเซ เชเซ เชเชฆเซเชถเซเชจเซ เชเชฐเซเชฐ เชเซ เชคเซ เชจเซเชเซ เชธเซเชเชฟเชฌเชฆเซเชง เชเซ.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
เช เชฎเชพเชฐเชพ เชกเซเชเชพเชฌเซเช เช เชจเซ เชเซเชฌเชฒ เชฌเชจเชพเชตเซ เชฐเชนเซเชฏเชพ เชเซเช
เชเชเชตเชพเชฐ เช
เชฎเซ เชธเซเชเช
เชช เชธเชเชฌเชเชงเชฟเชค เชคเชฎเชพเชฎ เชชเชเชฒเชพเชเช เชชเซเชฐเซเชฃ เชเชฐเซ เชฒเซเชงเชพ เชชเชเซ, เช
เชฎเซ เชเชเชณเชจเซเช เชเชพเชฎ BigQuery เชฎเชพเช เชกเซเชเชพเชธเซเช เช
เชจเซ เชเซเชฌเชฒ เชฌเชจเชพเชตเชตเชพเชจเซ เชเซ. เช เชเชฐเชตเชพเชจเซ เชเชฃเซ เชฐเซเชคเซ เชเซ, เชชเชฐเชเชคเซ เชธเซเชฅเซ เชธเชฐเชณ เช เชเซ เชเซ เชชเซเชฐเชฅเชฎ เชกเซเชเชพเชธเซเช เชฌเชจเชพเชตเซเชจเซ Google Cloud เชเชจเซเชธเซเชฒเชจเซ เชเชชเชฏเซเช เชเชฐเชตเซ. เชคเชฎเซ เชจเซเชเซเชจเชพ เชชเชเชฒเชพเชเชจเซ เช
เชจเซเชธเชฐเซ เชถเชเซ เชเซ
เชเชเซเชคเชฟ 3. เชเซเชฌเชฒ เชฒเซเชเชเช
เชตเชชเชฐเชพเชถเชเชฐเซเชคเชพ เชฒเซเช เชกเซเชเชพ เชชเซเชฐเชเชพเชถเชฟเชค เชเชฐเซ เชฐเชนเซเชฏเซเช เชเซ
เชชเชฌ/เชธเชฌ เช เช เชฎเชพเชฐเซ เชชเชพเชเชชเชฒเชพเชเชจเชจเซ เชเช เชฎเชนเชคเซเชตเชชเซเชฐเซเชฃ เชเชเช เชเซ เชเชพเชฐเชฃ เชเซ เชคเซ เชฌเชนเซเชตเชฟเชง เชธเซเชตเชคเชเชคเซเชฐ เชเชชเซเชฒเชฟเชเซเชถเชจเซเชจเซ เชเชเชฌเซเชเชพ เชธเชพเชฅเซ เชตเชพเชคเชเซเชค เชเชฐเชตเชพเชจเซ เชฎเชเชเซเชฐเซ เชเชชเซ เชเซ. เชเชพเชธ เชเชฐเซเชจเซ, เชคเซ เชฎเชงเซเชฏเชธเซเชฅเซ เชคเชฐเซเชเซ เชเชพเชฎ เชเชฐเซ เชเซ เชเซ เช เชฎเชจเซ เชเชชเซเชฒเชฟเชเซเชถเชจเซ เชตเชเซเชเซ เชธเชเชฆเซเชถเชพ เชฎเซเชเชฒเชตเชพ เช เชจเซ เชชเซเชฐเชพเชชเซเชค เชเชฐเชตเชพเชจเซ เชฎเชเชเซเชฐเซ เชเชชเซ เชเซ. เชชเซเชฐเชฅเชฎ เชตเชธเซเชคเซ เชเซ เชเชชเชฃเซ เชเชฐเชตเชพเชจเซ เชเชฐเซเชฐ เชเซ เชคเซ เชเช เชตเชฟเชทเชฏ เชฌเชจเชพเชตเชตเชพเชจเซ เชเซ. เชเชจเซเชธเซเชฒเชฎเชพเช เชซเชเซเชค Pub/Sub เชชเชฐ เชเชพเช เช เชจเซ CREATE TOPIC เชชเชฐ เชเซเชฒเชฟเช เชเชฐเซ.
เชจเซเชเซเชจเซ เชเซเชก เชเชชเชฐ เชตเซเชฏเชพเชเซเชฏเชพเชฏเชฟเชค เชฒเซเช เชกเซเชเชพ เชเชจเชฐเซเช เชเชฐเชตเชพ เชฎเชพเชเซ เช
เชฎเชพเชฐเซ เชธเซเชเซเชฐเชฟเชชเซเชเชจเซ เชเซเชฒ เชเชฐเซ เชเซ เช
เชจเซ เชชเชเซ เชฒเซเชเชจเซ Pub/Sub เชชเชฐ เชเซเชกเซ เชเซ เช
เชจเซ เชฎเซเชเชฒเซ เชเซ. เชเชชเชฃเซ เชฎเชพเชคเซเชฐ เชเช เชตเชธเซเชคเซ เชฌเชจเชพเชตเชตเชพเชจเซ เชเชฐเซเชฐ เชเซ เชชเซเชฐเชเชพเชถเช เชเซเชฒเชพเชฏเชจเซเช, เชชเชฆเซเชงเชคเชฟเชจเซ เชเชชเชฏเซเช เชเชฐเซเชจเซ เชตเชฟเชทเชฏเชจเซ เชฎเชพเชฐเซเช เชธเซเชชเชทเซเช เชเชฐเซ topic_path
เช
เชจเซ เชซเชเชเซเชถเชจเชจเซ เชเซเชฒ เชเชฐเซ publish
ั topic_path
เช
เชจเซ เชกเซเชเชพ. เชฎเชนเซเชฐเชฌเชพเชจเซ เชเชฐเซเชจเซ เชจเซเชเชง เชเชฐเซ เชเซ เช
เชฎเซ เชเชฏเชพเชค เชเชฐเซเช เชเซเช generate_log_line
เช
เชฎเชพเชฐเซ เชธเซเชเซเชฐเชฟเชชเซเชเชฎเชพเชเชฅเซ stream_logs
, เชคเซเชฅเซ เชเชพเชคเชฐเซ เชเชฐเซ เชเซ เช เชซเชพเชเชฒเซ เชธเชฎเชพเชจ เชซเซเชฒเซเชกเชฐเชฎเชพเช เชเซ, เช
เชจเซเชฏเชฅเชพ เชคเชฎเชจเซ เชเชฏเชพเชค เชญเซเชฒ เชฎเชณเชถเซ. เชชเชเซ เช
เชฎเซ เชเชจเซ เช
เชฎเชพเชฐเชพ เชเซเชเชฒ เชเชจเซเชธเซเชฒ เชฆเซเชตเชพเชฐเชพ เชเชจเซ เชเชชเชฏเซเช เชเชฐเซเชจเซ เชเชฒเชพเชตเซ เชถเชเซเช เชเซเช:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
เชเชฒเชฆเซ เชซเชพเชเชฒ เชเชพเชฒเซ เชเซ, เช เชฎเซ เชจเซเชเซเชจเซ เชเชเซเชคเชฟเชฎเชพเช เชฌเชคเชพเชตเซเชฏเชพ เชชเซเชฐเชฎเชพเชฃเซ, เชเชจเซเชธเซเชฒ เชชเชฐ เชฒเซเช เชกเซเชเชพเชจเซเช เชเชเชเชชเซเช เชเซเช เชถเชเซเชถเซเช. เชเซเชฏเชพเช เชธเซเชงเซ เช เชฎเซ เชเชชเชฏเซเช เชจเชนเซเช เชเชฐเซเช เชคเซเชฏเชพเช เชธเซเชงเซ เช เชธเซเชเซเชฐเชฟเชชเซเช เชเชพเชฎ เชเชฐเชถเซ เชธเซเชเซเชเชฐเชเชฒ + เชธเซเชคเซเชจเซ เชชเซเชฐเซเชฃ เชเชฐเชตเชพ เชฎเชพเชเซ.
เชเชเซเชคเชฟ 4. เชเชเชเชชเซเช publish_logs.py
เช เชฎเชพเชฐเชพ เชชเชพเชเชชเชฒเชพเชเชจ เชเซเชก เชฒเชเซ เชฐเชนเซเชฏเชพ เชเซเช
เชนเชตเซ เชเซเชฏเชพเชฐเซ เช
เชฎเชพเชฐเซ เชชเชพเชธเซ เชฌเชงเซเช เชคเซเชฏเชพเชฐ เชเซ, เช
เชฎเซ เชฎเชเชพเชจเซ เชญเชพเช เชถเชฐเซ เชเชฐเซ เชถเชเซเช เชเซเช - เชฌเซเชฎ เช
เชจเซ เชชเชพเชฏเชฅเซเชจเชจเซ เชเชชเชฏเซเช เชเชฐเซเชจเซ เช
เชฎเชพเชฐเซ เชชเชพเชเชชเชฒเชพเชเชจเชจเซเช เชเซเชกเชฟเชเช. เชฌเซเชฎ เชชเชพเชเชชเชฒเชพเชเชจ เชฌเชจเชพเชตเชตเชพ เชฎเชพเชเซ, เชเชชเชฃเซ เชชเชพเชเชชเชฒเชพเชเชจ เชเชฌเซเชเซเชเซเช (p) เชฌเชจเชพเชตเชตเชพเชจเซ เชเชฐเซเชฐ เชเซ. เชเชเชตเชพเชฐ เช
เชฎเซ เชชเชพเชเชชเชฒเชพเชเชจ เชเชฌเซเชเซเชเซเช เชฌเชจเชพเชตเซ เชฒเซเชงเชพ เชชเชเซ, เช
เชฎเซ เชเชชเชฐเซเชเชฐเชจเซ เชเชชเชฏเซเช เชเชฐเซเชจเซ เชเช เชชเชเซ เชเช เชฌเชนเซเชตเชฟเชง เชเชพเชฐเซเชฏเซ เชฒเชพเชเซ เชเชฐเซ เชถเชเซเช เชเซเช pipe (|)
. เชธเชพเชฎเชพเชจเซเชฏ เชฐเซเชคเซ, เชตเชฐเซเชเชซเซเชฒเซ เชจเซเชเซเชจเซ เชเชฌเซ เชเซเชตเซ เชฆเซเชเชพเชฏ เชเซ.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
เช
เชฎเชพเชฐเชพ เชเซเชกเชฎเชพเช, เช
เชฎเซ เชฌเซ เชเชธเซเชเชฎ เชซเชเชเซเชถเชจ เชฌเชจเชพเชตเซเชถเซเช. เชเชพเชฐเซเชฏ regex_clean
เชเซ เชกเซเชเชพเชจเซ เชธเซเชเซเชจ เชเชฐเซ เชเซ เช
เชจเซ เชซเชเชเซเชถเชจเชจเซ เชเชชเชฏเซเช เชเชฐเซเชจเซ PATTERNS เชฏเชพเชฆเซเชจเชพ เชเชงเชพเชฐเซ เช
เชจเซเชฐเซเชช เชชเชเชเซเชคเชฟเชจเซ เชชเซเชจเชเชชเซเชฐเชพเชชเซเชค เชเชฐเซ เชเซ re.search
. เชซเชเชเซเชถเชจ เช
เชฒเซเชชเชตเชฟเชฐเชพเชฎเชฅเซ เช
เชฒเช เชเชฐเซเชฒเซ เชธเซเชเซเชฐเชฟเชเช เชชเชฐเชค เชเชฐเซ เชเซ. เชเซ เชคเชฎเซ เชจเชฟเชฏเชฎเชฟเชค เช
เชญเชฟเชตเซเชฏเชเซเชคเชฟ เชจเชฟเชทเซเชฃเชพเชค เชจเชฅเซ, เชคเซ เชนเซเช เช เชคเชชเชพเชธเชตเชพเชจเซ เชญเชฒเชพเชฎเชฃ เชเชฐเซเช เชเซเช datetime
เชคเซเชจเซ เชเชพเชฐเซเชฏ เชเชฐเชตเชพ เชฎเชพเชเซ เชเชพเชฐเซเชฏเชจเซ เช
เชเชฆเชฐ. เชฎเชจเซ เชซเชพเชเชฒเชจเซ เชถเชฐเซเชเชคเชฎเชพเช เชเช เชเชฏเชพเชค เชญเซเชฒ เชฎเชณเซ เชฐเชนเซ เชนเชคเซ, เชเซ เชตเชฟเชเชฟเชคเซเชฐ เชนเชคเซ. เช เชฏเชพเชฆเซ เชชเชเซ เชซเชเชเซเชถเชจเชฎเชพเช เชฎเซเชเชฒเชตเชพเชฎเชพเช เชเชตเซ เชเซ เชฌเชฟเชเชเซเชตเซเชฐเซ เชฒเชเซ, เชเซ เชซเชเซเชค เชเซเชฌเชฒเชฎเชพเช เชเชชเชฃเซ เชกเซเชเชพ เชเชฎเซเชฐเซ เชเซ. เชฌเซเช เชกเซเชเชพเชซเซเชฒเซ เชเซเชฌ เช
เชจเซ เชธเซเชเซเชฐเซเชฎเชฟเชเช เชกเซเชเชพเชซเซเชฒเซ เชเซเชฌ เชฎเชพเชเซเชจเซ เชเซเชก เชจเซเชเซ เชเชชเซเชฒ เชเซ. เชฌเซเช เช
เชจเซ เชธเซเชเซเชฐเซเชฎเชฟเชเช เชเซเชก เชตเชเซเชเซเชจเซ เชคเชซเชพเชตเชค เชเชเชฒเซ เช เชเซ เชเซ เชฌเซเชเชฎเชพเช เชเชชเชฃเซ CSV เชตเชพเชเชเซเช เชเซเช src_path
เชซเชเชเซเชถเชจเชจเซ เชเชชเชฏเซเช เชเชฐเซเชจเซ ReadFromText
เชฌเซเชฎ เชฎเชพเชเชฅเซ.
เชฌเซเช เชกเซเชเชพเชซเซเชฒเซ เชเซเชฌ (เชฌเซเช เชชเซเชฐเซเชธเซเชธเชฟเชเช)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
เชธเซเชเซเชฐเซเชฎเชฟเชเช เชกเซเชเชพเชซเซเชฒเซ เชเซเชฌ (เชธเซเชเซเชฐเซเชฎ เชชเซเชฐเซเชธเซเชธเชฟเชเช)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
เชเชจเซเชตเซเชฏเชฐ เชถเชฐเซ เชเชฐเซ เชฐเชนเซเชฏเชพ เชเซเช
เช เชฎเซ เชชเชพเชเชชเชฒเชพเชเชจเชจเซ เชตเชฟเชตเชฟเชง เชฐเซเชคเซ เชเชฒเชพเชตเซ เชถเชเซเช เชเซเช. เชเซ เช เชฎเซ เชเชเซเชเซเช เชคเซ, GCP เชฎเชพเช เชฆเซเชฐเชธเซเชฅ เชฐเซเชคเซ เชฒเซเช เชเชจ เชเชฐเชคเซ เชตเชเชคเซ เช เชฎเซ เชคเซเชจเซ เชเชฐเซเชฎเชฟเชจเชฒเชฅเซ เชธเซเชฅเชพเชจเชฟเช เชฐเซเชคเซ เชเชฒเชพเชตเซ เชถเชเซเช เชเซเช.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
เชเซ เชเซ, เช เชฎเซ เชกเซเชเชพเชซเซเชฒเซเชจเซ เชเชชเชฏเซเช เชเชฐเซเชจเซ เชคเซเชจเซ เชเชฒเชพเชตเชตเชพ เชเช เชฐเชนเซเชฏเชพ เชเซเช. เช เชฎเซ เชจเซเชเซเชจเชพ เชเชฐเซเชฐเซ เชชเชฐเชฟเชฎเชพเชฃเซ เชธเซเช เชเชฐเซเชจเซ เชจเซเชเซเชจเชพ เชเชฆเซเชถเชจเซ เชเชชเชฏเซเช เชเชฐเซเชจเซ เช เชเชฐเซ เชถเชเซเช เชเซเช.
project
โ เชคเชฎเชพเชฐเชพ GCP เชชเซเชฐเซเชเซเชเซเชเชจเซเช ID.runner
เชเช เชชเชพเชเชชเชฒเชพเชเชจ เชฐเชจเชฐ เชเซ เชเซ เชคเชฎเชพเชฐเชพ เชชเซเชฐเซเชเซเชฐเชพเชฎเชจเซเช เชตเชฟเชถเซเชฒเซเชทเชฃ เชเชฐเชถเซ เช เชจเซ เชคเชฎเชพเชฐเซ เชชเชพเชเชชเชฒเชพเชเชจเชจเซเช เชจเชฟเชฐเซเชฎเชพเชฃ เชเชฐเชถเซ. เชเซเชฒเชพเชเชกเชฎเชพเช เชเชฒเชพเชตเชตเชพ เชฎเชพเชเซ, เชคเชฎเชพเชฐเซ เชกเซเชเชพเชซเซเชฒเซเชฐเชจเชฐเชจเซ เชเชฒเซเชฒเซเช เชเชฐเชตเซ เชเชตเชถเซเชฏเช เชเซ.staging_location
โ เชเชพเชฐเซเชฏ เชเชฐเซ เชฐเชนเซเชฒเชพ เชชเซเชฐเซเชธเซเชธเชฐเซ เชฆเซเชตเชพเชฐเชพ เชเชฐเซเชฐเซ เชเชจเซเชกเซเชเซเชธเซเชเช เชเซเชก เชชเซเชเซเชเซ เชฎเชพเชเซ เชเซเชฒเชพเชเชก เชกเซเชเชพเชซเซเชฒเซ เชเซเชฒเชพเชเชก เชธเซเชเซเชฐเซเชเชจเซ เชฎเชพเชฐเซเช.temp_location
- เชเซเชฏเชพเชฐเซ เชชเชพเชเชชเชฒเชพเชเชจ เชเชพเชฒเซ เชฐเชนเซ เชนเซเชฏ เชคเซเชฏเชพเชฐเซ เชฌเชจเชพเชตเซเชฒ เชเชพเชฎเชเชฒเชพเช เชเซเชฌ เชซเชพเชเชฒเซเชจเซ เชธเซเชเซเชฐ เชเชฐเชตเชพ เชฎเชพเชเซ เชเซเชฒเชพเชเชก เชกเซเชเชพเชซเซเชฒเซ เชเซเชฒเชพเชเชก เชธเซเชเซเชฐเซเชเชจเซ เชชเชพเชฅ.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
เชเซเชฏเชพเชฐเซ เช เชเชฆเซเชถ เชเชพเชฒเซ เชฐเชนเซเชฏเซ เชนเซเชฏ, เชคเซเชฏเชพเชฐเซ เช เชฎเซ google เชเชจเซเชธเซเชฒเชฎเชพเช DataFlow เชเซเชฌ เชชเชฐ เชเช เชถเชเซเช เชเซเช เช เชจเซ เช เชฎเชพเชฐเซ เชชเชพเชเชชเชฒเชพเชเชจ เชเซเช เชถเชเซเช เชเซเช. เชเซเชฏเชพเชฐเซ เชเชชเชฃเซ เชชเชพเชเชชเชฒเชพเชเชจ เชชเชฐ เชเซเชฒเชฟเช เชเชฐเซเช เชเซเช, เชคเซเชฏเชพเชฐเซ เชเชชเชฃเซ เชเชเซเชคเชฟ 4 เชเซเชตเซเช เชเชเชเช เชเซเชตเซเช เชเซเชเช. เชกเซเชฌเชเซเชเช เชนเซเชคเซเช เชฎเชพเชเซ, เชตเชฟเชเชคเชตเชพเชฐ เชฒเซเช เชเซเชตเชพ เชฎเชพเชเซ เชฒเซเชเซเชธ เช เชจเซ เชชเชเซ เชธเซเชเซเชเชกเซเชฐเชพเชเชตเชฐ เชชเชฐ เชเชตเชพเชจเซเช เชเซเชฌ เช เชฎเชฆเชฆเชฐเซเชช เชฅเช เชถเชเซ เชเซ. เชเชจเชพเชฅเซ เชฎเชจเซ เช เชธเชเชเซเชฏ เชเซเชธเซเชฎเชพเช เชชเชพเชเชชเชฒเชพเชเชจ เชธเชฎเชธเซเชฏเชพเช เชเชเซเชฒเชตเชพเชฎเชพเช เชฎเชฆเชฆ เชฎเชณเซ เชเซ.
เชเชเซเชคเชฟ 4: เชฌเซเชฎ เชเชจเซเชตเซเชฏเชฐ
BigQuery เชฎเชพเช เช เชฎเชพเชฐเซ เชกเซเชเชพ เชเชเซเชธเซเชธ เชเชฐเซ
เชคเซเชฅเซ, เช เชฎเชพเชฐเซ เชชเชพเชธเซ เชชเชนเซเชฒเชพเชฅเซ เช เชเช เชชเชพเชเชชเชฒเชพเชเชจ เชนเซเชตเซ เชเซเชเช เชเซเชฎเชพเช เช เชฎเชพเชฐเชพ เชเซเชทเซเชเชเชฎเชพเช เชกเซเชเชพ เชตเชนเซเชคเซ เชนเซเชฏ. เชเชจเซ เชเชเชพเชธเชตเชพ เชฎเชพเชเซ, เช เชฎเซ BigQuery เชชเชฐ เชเชเชจเซ เชกเซเชเชพ เชเซเช เชถเชเซเช เชเซเช. เชจเซเชเซเชจเชพ เชเชฆเซเชถเชจเซ เชเชชเชฏเซเช เชเชฐเซเชฏเชพ เชชเชเซ เชคเชฎเชพเชฐเซ เชกเซเชเชพเชธเซเชเชจเซ เชชเซเชฐเชฅเชฎ เชเซเชเชฒเซเช เชชเชเชเซเชคเชฟเช เชเซเชตเซ เชเซเชเช. เชนเชตเซ เชเซเชฏเชพเชฐเซ เช เชฎเชพเชฐเซ เชชเชพเชธเซ BigQuery เชฎเชพเช เชกเซเชเชพ เชธเชเชเซเชฐเชนเชฟเชค เชเซ, เชคเซเชฏเชพเชฐเซ เช เชฎเซ เชตเชงเซ เชตเชฟเชถเซเชฒเซเชทเชฃ เชเชฐเซ เชถเชเซเช เชเซเช, เชคเซเชฎเช เชธเชพเชฅเซเชฆเชพเชฐเซ เชธเชพเชฅเซ เชกเซเชเชพ เชถเซเชฐ เชเชฐเซ เชถเชเซเช เชเซเช เช เชจเซ เชตเซเชฏเชตเชธเชพเชฏเชฟเช เชชเซเชฐเชถเซเชจเซเชจเชพ เชเชตเชพเชฌ เชเชชเชตเชพเชจเซเช เชถเชฐเซ เชเชฐเซ เชถเชเซเช เชเซเช.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
เชเชเซเชคเชฟ 5: BigQuery
เชจเชฟเชทเซเชเชฐเซเชท
เช เชฎเซ เชเชถเชพ เชฐเชพเชเซเช เชเซเช เชเซ เช เชชเซเชธเซเช เชธเซเชเซเชฐเซเชฎเชฟเชเช เชกเซเชเชพ เชชเชพเชเชชเชฒเชพเชเชจ เชฌเชจเชพเชตเชตเชพ เชคเซเชฎเช เชกเซเชเชพเชจเซ เชตเชงเซ เชธเซเชฒเชญ เชฌเชจเชพเชตเชตเชพเชจเชพ เชฎเชพเชฐเซเชเซ เชถเซเชงเชตเชพเชจเชพ เชเช เชเชชเชฏเซเชเซ เชเชฆเชพเชนเชฐเชฃ เชคเชฐเซเชเซ เชธเซเชตเชพ เชเชชเชถเซ. เช เชซเซเชฐเซเชฎเซเชเชฎเชพเช เชกเซเชเชพ เชธเซเชเซเชฐ เชเชฐเชตเชพเชฅเซ เช เชฎเชจเซ เชเชฃเชพ เชซเชพเชฏเชฆเชพ เชฎเชณเซ เชเซ. เชนเชตเซ เชเชชเชฃเซ เชฎเชนเชคเซเชตเชชเซเชฐเซเชฃ เชชเซเชฐเชถเซเชจเซเชจเชพ เชเชตเชพเชฌ เชเชชเชตเชพเชจเซเช เชถเชฐเซ เชเชฐเซ เชถเชเซเช เชเซเช เชเซเชฎ เชเซ เชเซเชเชฒเชพ เชฒเซเชเซ เช เชฎเชพเชฐเซ เชชเซเชฐเซเชกเชเซเชเชจเซ เชเชชเชฏเซเช เชเชฐเซ เชฐเชนเซเชฏเชพ เชเซ? เชถเซเช เชคเชฎเชพเชฐเซ เชตเชชเชฐเชพเชถเชเชฐเซเชคเชพ เชเชงเชพเชฐ เชธเชฎเชฏ เชธเชพเชฅเซ เชตเชงเซ เชฐเชนเซเชฏเซ เชเซ? เชฒเซเชเซ เชเชคเซเชชเชพเชฆเชจเชจเชพ เชเชฏเชพ เชชเชพเชธเชพเช เชธเชพเชฅเซ เชธเซเชฅเซ เชตเชงเซ เชเซเชฐเชฟเชฏเชพเชชเซเชฐเชคเชฟเชเซเชฐเชฟเชฏเชพ เชเชฐเซ เชเซ? เช เชจเซ เชถเซเช เชเชตเซ เชญเซเชฒเซ เชเซ เชเซเชฏเชพเช เชจ เชนเซเชตเซ เชเซเชเช? เช เชเชตเชพ เชชเซเชฐเชถเซเชจเซ เชเซ เชเซ เชธเชเชธเซเชฅเชพเชจเชพ เชนเชฟเชคเชจเชพ เชฐเชนเซเชถเซ. เช เชชเซเชฐเชถเซเชจเซเชจเชพ เชเชตเชพเชฌเซเชฎเชพเชเชฅเซ เชเชฆเซเชฆเชญเชตเชคเซ เชเชเชคเชฐเชฆเซเชทเซเชเชฟเชจเชพ เชเชงเชพเชฐเซ, เช เชฎเซ เชเชคเซเชชเชพเชฆเชจเชจเซ เชธเซเชงเชพเชฐเซ เชถเชเซเช เชเซเช เช เชจเซ เชตเชชเชฐเชพเชถเชเชฐเซเชคเชพเชจเซ เชธเชเชพเช เชตเชงเชพเชฐเซ เชถเชเซเช เชเซเช.
เชฌเซเชฎ เช เชชเซเชฐเชเชพเชฐเชจเซ เชเชธเชฐเชค เชฎเชพเชเซ เชเชฐเซเชเชฐ เชเชชเชฏเซเชเซ เชเซ เช เชจเซ เชคเซเชจเชพ เชเชชเชฏเซเชเชจเชพ เช เชจเซเชฏ เชฐเชธเชชเซเชฐเชฆ เชเชฟเชธเซเชธเชพเช เชชเชฃ เชเซ. เชเชฆเชพเชนเชฐเชฃ เชคเชฐเซเชเซ, เชคเชฎเซ เชฐเซเช เชฒ เชเชพเชเชฎเชฎเชพเช เชธเซเชเซเช เชเชฟเช เชกเซเชเชพเชจเซเช เชตเชฟเชถเซเชฒเซเชทเชฃ เชเชฐเชตเชพ เช เชจเซ เชตเชฟเชถเซเชฒเซเชทเชฃเชจเชพ เชเชงเชพเชฐเซ เชธเซเชฆเชพ เชเชฐเชตเชพ เชฎเชพเชเซ เชเซ, เชเชฆเชพเช เชคเชฎเชพเชฐเซ เชชเชพเชธเซ เชตเชพเชนเชจเซเชฎเชพเชเชฅเซ เชเชตเชคเชพ เชธเซเชจเซเชธเชฐ เชกเซเชเชพ เชเซ เช เชจเซ เชคเชฎเซ เชเซเชฐเชพเชซเชฟเช เชธเซเชคเชฐเชจเซ เชเชฃเชคเชฐเซเชจเซ เชเชฃเชคเชฐเซ เชเชฐเชตเชพ เชฎเชพเชเซ เชเซ. เชคเชฎเซ, เชเชฆเชพเชนเชฐเชฃ เชคเชฐเซเชเซ, เชเช เชเซเชฎเชฟเชเช เชเชเชชเชจเซ เชชเชฃ เชฌเชจเซ เชถเชเซ เชเซ เชเซ เชตเชชเชฐเชพเชถเชเชฐเซเชคเชพเชจเซ เชกเซเชเชพ เชเชเชคเซเชฐเชฟเชค เชเชฐเซ เชเซ เช เชจเซ เชฎเซเชเซเชฏ เชฎเซเชเซเชฐเชฟเชเซเชธเชจเซ เชเซเชฐเซ เช เชเชฐเชตเชพ เชฎเชพเชเซ เชกเซเชถเชฌเซเชฐเซเชกเซเชธ เชฌเชจเชพเชตเชตเชพ เชฎเชพเชเซ เชคเซเชจเซ เชเชชเชฏเซเช เชเชฐเซ เชเซ. เช เซเช เชเซ, เชธเชเซเชเชจเซ, เช เชฌเซเชเซ เชชเซเชธเซเช เชฎเชพเชเซเชจเซ เชตเชฟเชทเชฏ เชเซ, เชตเชพเชเชเชตเชพ เชฌเชฆเชฒ เชเชญเชพเชฐ, เช เชจเซ เชเซเช เชธเชเชชเซเชฐเซเชฃ เชเซเชก เชเซเชตเชพ เชฎเชพเชเซ เชเซ เชคเซเชฎเชจเชพ เชฎเชพเชเซ เชจเซเชเซ เชฎเชพเชฐเชพ GitHub เชจเซ เชฒเชฟเชเช เชเซ.
เชฌเชธ.
เชธเซเชฐเซเชธ: www.habr.com