เจธเจพเจฐเจฟเจเจ เจจเฉเฉฐ เจธเจคเจฟ เจธเจผเฉเจฐเฉ เจ
เจเจพเจฒ. เจ
เจธเฉเจ เจฒเฉเจ เจฆเฉ เจ
เฉฐเจคเจฎ เจนเจฟเฉฑเจธเฉ เจฆเจพ เจ
เจจเฉเจตเจพเจฆ เจธเจพเจเจเจพ เจเจฐ เจฐเจนเฉ เจนเจพเจ, เจเจพเจธ เจคเฉเจฐ 'เจคเฉ เจเฉเจฐเจธ เจฆเฉ เจตเจฟเจฆเจฟเจเจฐเจฅเฉเจเจ เจฒเจ เจคเจฟเจเจฐ เจเฉเจคเจพ เจเจฟเจ เจนเฉเฅค
เจฐเฉเจ เจฒ-เจเจพเจเจฎ เจชเจพเจเจชเจฒเจพเจเจจเจพเจ เจฒเจ เจ เจชเจพเจเฉ เจฌเฉเจฎ เจ เจคเฉ เจกเฉเจเจพเจซเจฒเฉ
Google เจเจฒเจพเจเจก เจธเฉเฉฑเจเจ เฉฑเจช เจเฉเจคเจพ เจเจพ เจฐเจฟเจนเจพ เจนเฉ
เจจเฉเจ: เจฎเฉเจ เจชเจพเจเจชเจฒเจพเจเจจ เจจเฉเฉฐ เจเจฒเจพเจเจฃ เจ เจคเฉ เจเจธเจเจฎ เจฒเฉเจ เจกเฉเจเจพ เจจเฉเฉฐ เจชเฉเจฐเจเจพเจธเจผเจฟเจค เจเจฐเจจ เจฒเจ เจเฉเจเจฒ เจเจฒเจพเจเจก เจธเจผเฉเฉฑเจฒ เจฆเฉ เจตเจฐเจคเฉเจ เจเฉเจคเฉ เจเจฟเจเจเจเจฟ เจฎเฉเจจเฉเฉฐ เจชเจพเจเจฅเจจ 3 เจตเจฟเฉฑเจ เจชเจพเจเจชเจฒเจพเจเจจ เจเจฒเจพเจเจฃ เจตเจฟเฉฑเจ เจฎเฉเจธเจผเจเจฒ เจ เจฐเจนเฉ เจธเฉเฅค เจเฉเจเจฒ เจเจฒเจพเจเจก เจธเจผเฉเฉฑเจฒ เจชเจพเจเจฅเจจ 2 เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจฆเจพ เจนเฉ, เจเฉ เจเจฟ เจ เจชเจพเจเฉ เจฌเฉเจฎ เจจเจพเจฒ เจตเจงเฉเจฐเฉ เจ เจจเฉเจเฉเจฒ เจนเฉเฅค
เจชเจพเจเจชเจฒเจพเจเจจ เจธเจผเฉเจฐเฉ เจเจฐเจจ เจฒเจ, เจธเจพเจจเฉเฉฐ เจธเฉเจเจฟเฉฐเจเจพเจ เจตเจฟเฉฑเจ เจฅเฉเฉเจพ เจเจฟเจนเจพ เจเฉเจฆเจฃ เจฆเฉ เจฒเฉเฉ เจนเฉเฅค เจคเฉเจนเจพเจกเฉ เจตเจฟเฉฑเจเฉเจ เจเจฟเจจเฉเจนเจพเจ เจจเฉ เจชเจนเจฟเจฒเจพเจ GCP เจฆเฉ เจตเจฐเจคเฉเจ เจจเจนเฉเจ เจเฉเจคเฉ เจนเฉ, เจคเฉเจนเจพเจจเฉเฉฐ เจเจธ เจตเจฟเฉฑเจ เจฆเฉฑเจธเฉ เจเจ เจนเฉเจ เจพเจ เจฆเจฟเฉฑเจคเฉ 6 เจเจฆเจฎเจพเจ เจฆเฉ เจชเจพเจฒเจฃเจพ เจเจฐเจจ เจฆเฉ เจฒเฉเฉ เจนเฉเจตเฉเจเฉ
เจเจธ เจคเฉเจ เจฌเจพเจ
เจฆ, เจธเจพเจจเฉเฉฐ เจเจชเจฃเฉเจเจ เจธเจเฉเจฐเจฟเจชเจเจพเจ เจจเฉเฉฐ เจเฉเจเจฒ เจเจฒเจพเจเจก เจธเจเฉเจฐเฉเจ 'เจคเฉ เจ
เจชเจฒเฉเจก เจเจฐเจจ เจ
เจคเฉ เจเจนเจจเจพเจ เจจเฉเฉฐ เจธเจพเจกเฉ เจเฉเจเจฒ เจเจฒเจพเจเจก เจธเจผเฉเจฒ 'เจคเฉ เจเจพเจชเฉ เจเจฐเจจ เจฆเฉ เจเจผเจฐเฉเจฐเจค เจนเฉเจเจเฉเฅค เจเจฒเจพเจเจก เจธเจเฉเจฐเฉเจ 'เจคเฉ เจ
เจชเจฒเฉเจก เจเจฐเจจเจพ เจฌเจนเฉเจค เจฎเจพเจฎเฉเจฒเฉ เจนเฉ (เจเฉฑเจ เจตเฉเจฐเจตเจพ เจชเจพเจเจ เจเจพ เจธเจเจฆเจพ เจนเฉ
2 เจเจฟเฉฑเจคเจฐ
เจซเจพเจเจฒเจพเจ เจฆเฉ เจจเจเจฒ เจเจฐเจจ เจ เจคเฉ เจฒเฉเฉเฉเจเจฆเฉเจเจ เจฒเจพเจเจฌเฉเจฐเฉเจฐเฉเจเจ เจจเฉเฉฐ เจธเจฅเจพเจชเจฟเจค เจเจฐเจจ เจฒเจ เจธเจพเจจเฉเฉฐ เจฒเฉเฉเฉเจเจฆเฉเจเจ เจเจฎเจพเจเจกเจพเจ เจนเฉเจ เจพเจ เจธเฉเจเฉเจฌเฉฑเจง เจนเจจเฅค
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
เจธเจพเจกเจพ เจกเจพเจเจพเจฌเฉเจธ เจ เจคเฉ เจธเจพเจฐเจฃเฉ เจฌเจฃเจพเจเจฃเจพ
เจเฉฑเจ เจตเจพเจฐ เจเจฆเฉเจ เจ
เจธเฉเจ เจธเฉเฉฑเจเจ
เฉฑเจช เจธเฉฐเจฌเฉฐเจงเฉ เจธเจพเจฐเฉ เจชเฉเจพเจ
เจชเฉเจฐเฉ เจเจฐ เจฒเฉเจเจฆเฉ เจนเจพเจ, เจคเจพเจ เจ
เจเจฒเฉ เจเฉเจเจผ เจเฉ เจธเจพเจจเฉเฉฐ เจเจฐเจจ เจฆเฉ เจฒเฉเฉ เจนเฉ เจเจน เจนเฉ BigQuery เจตเจฟเฉฑเจ เจเฉฑเจ เจกเจพเจเจพเจธเฉเจ เจ
เจคเฉ เจธเจพเจฐเจฃเฉ เจฌเจฃเจพเจเจฃเจพเฅค เจ
เจเจฟเจนเจพ เจเจฐเจจ เจฆเฉ เจเจ เจคเจฐเฉเจเฉ เจนเจจ, เจชเจฐ เจธเจญ เจคเฉเจ เจธเจฐเจฒ เจนเฉ เจชเจนเจฟเจฒเจพเจ เจกเฉเจเจพเจธเฉเจ เจฌเจฃเจพ เจเฉ เจเฉเจเจฒ เจเจฒเจพเจเจก เจเฉฐเจธเฉเจฒ เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจจเจพเฅค เจคเฉเจธเฉเจ เจนเฉเจ เจพเจ เจฆเจฟเฉฑเจคเฉ เจเจฆเจฎเจพเจ เจฆเฉ เจชเจพเจฒเจฃเจพ เจเจฐ เจธเจเจฆเฉ เจนเฉ
เจเจฟเฉฑเจคเจฐ 3. เจเฉเจฌเจฒ เจฒเฉเจเจเจ
เจเจชเจญเฉเจเจคเจพ เจฒเฉเจ เจกเฉเจเจพ เจชเฉเจฐเจเจพเจธเจผเจฟเจค เจเฉเจคเจพ เจเจพ เจฐเจฟเจนเจพ เจนเฉ
Pub/Sub เจธเจพเจกเฉ เจชเจพเจเจชเจฒเจพเจเจจ เจฆเจพ เจเฉฑเจ เจฎเจนเฉฑเจคเจตเจชเฉเจฐเจจ เจนเจฟเฉฑเจธเจพ เจนเฉ เจเจฟเจเจเจเจฟ เจเจน เจเจ เจธเฉเจคเฉฐเจคเจฐ เจเจชเจฒเฉเจเฉเจธเจผเจจเจพเจ เจจเฉเฉฐ เจเฉฑเจ เจฆเฉเจเฉ เจจเจพเจฒ เจธเฉฐเจเจพเจฐ เจเจฐเจจ เจฆเฉ เจเจเจพเจเจผเจค เจฆเจฟเฉฐเจฆเจพ เจนเฉเฅค เจเจพเจธ เจคเฉเจฐ 'เจคเฉ, เจเจน เจเฉฑเจ เจตเจฟเจเฉเจฒเฉ เจตเจเฉเจ เจเฉฐเจฎ เจเจฐเจฆเจพ เจนเฉ เจเฉ เจธเจพเจจเฉเฉฐ เจเจชเจฒเฉเจเฉเจธเจผเจจเจพเจ เจตเจฟเจเจเจพเจฐ เจธเฉฐเจฆเฉเจธเจผ เจญเฉเจเจฃ เจ เจคเฉ เจชเฉเจฐเจพเจชเจค เจเจฐเจจ เจฆเฉ เจเจเจพเจเจผเจค เจฆเจฟเฉฐเจฆเจพ เจนเฉเฅค เจธเจญ เจคเฉเจ เจชเจนเจฟเจฒเจพเจ เจธเจพเจจเฉเฉฐ เจเฉฑเจ เจตเจฟเจธเจผเจพ เจฌเจฃเจพเจเจฃ เจฆเฉ เจฒเฉเฉ เจนเฉเฅค เจฌเจธ เจเฉฐเจธเฉเจฒ เจตเจฟเฉฑเจ เจชเจฌ/เจธเจฌ 'เจคเฉ เจเจพเจ เจ เจคเฉ เจตเจฟเจธเจผเจพ เจฌเจฃเจพเจ 'เจคเฉ เจเจฒเจฟเฉฑเจ เจเจฐเฉเฅค
เจนเฉเจ เจพเจ เจฆเจฟเฉฑเจคเจพ เจเฉเจก เจเฉฑเจชเจฐ เจชเจฐเจฟเจญเจพเจธเจผเจฟเจค เจฒเฉเจ เจกเฉเจเจพ เจฌเจฃเจพเจเจฃ เจฒเจ เจธเจพเจกเฉ เจธเจเฉเจฐเจฟเจชเจ เจจเฉเฉฐ เจเจพเจฒ เจเจฐเจฆเจพ เจนเฉ เจ
เจคเฉ เจซเจฟเจฐ เจฒเฉเจเจธ เจจเฉเฉฐ Pub/Sub เจจเฉเฉฐ เจเฉเฉเจฆเจพ เจนเฉ เจ
เจคเฉ เจญเฉเจเจฆเจพ เจนเฉเฅค เจธเจพเจจเฉเฉฐ เจธเจฟเจฐเจซ เจเฉฑเจ เจเฉเจเจผ เจฌเจฃเจพเจเจฃ เจฆเฉ เจฒเฉเฉ เจนเฉ เจชเฉเจฐเจเจพเจธเจผเจ เจเจฒเจพเจเฉฐเจ, เจตเจฟเจงเฉ เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจเฉ เจตเจฟเจธเจผเฉ เจฆเจพ เจฎเจพเจฐเจ เจฆเจฟเจ topic_path
เจ
เจคเฉ เจซเฉฐเจเจธเจผเจจ เจจเฉเฉฐ เจเจพเจฒ เจเจฐเฉ publish
ั topic_path
เจ
เจคเฉ เจกเจพเจเจพเฅค เจเจฟเจฐเจชเจพ เจเจฐเจเฉ เจงเจฟเจเจจ เจฆเจฟเจ เจเจฟ เจ
เจธเฉเจ เจเจฏเจพเจค เจเจฐเจฆเฉ เจนเจพเจ generate_log_line
เจธเจพเจกเฉ เจธเจเฉเจฐเจฟเจชเจ เจคเฉเจ stream_logs
, เจเจธ เจฒเจ เจฏเจเฉเจจเฉ เจฌเจฃเจพเจ เจเจฟ เจเจน เจซเจพเจเจฒเจพเจ เจเฉฑเจเฉ เจซเฉเจฒเจกเจฐ เจตเจฟเฉฑเจ เจนเจจ, เจจเจนเฉเจ เจคเจพเจ เจคเฉเจนเจพเจจเฉเฉฐ เจเฉฑเจ เจเจฏเจพเจค เจเจฒเจคเฉ เจฎเจฟเจฒเฉเจเฉเฅค เจซเจฟเจฐ เจ
เจธเฉเจ เจเจธเจจเฉเฉฐ เจเจชเจฃเฉ เจเฉเจเจฒ เจเฉฐเจธเฉเจฒ เจฆเฉเจเจฐเจพ เจตเจฐเจค เจเฉ เจเจฒเจพ เจธเจเจฆเฉ เจนเจพเจ:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
เจเจฟเจตเฉเจ เจนเฉ เจซเจพเจเจฒ เจเฉฑเจฒเจฆเฉ เจนเฉ, เจ เจธเฉเจ เจเฉฐเจธเฉเจฒ เจตเจฟเฉฑเจ เจฒเฉเจ เจกเฉเจเจพ เจฆเฉ เจเจเจเจชเฉเฉฑเจ เจจเฉเฉฐ เจตเฉเจเจฃ เจฆเฉ เจฏเฉเจ เจนเฉเจตเจพเจเจเฉ, เจเจฟเจตเฉเจ เจเจฟ เจนเฉเจ เจพเจ เจเจฟเฉฑเจคเจฐ เจตเจฟเฉฑเจ เจฆเจฟเจเจพเจเจ เจเจฟเจ เจนเฉเฅค เจเจน เจธเจเฉเจฐเจฟเจชเจ เจเจฆเฉเจ เจคเฉฑเจ เจเฉฐเจฎ เจเจฐเฉเจเฉ เจเจฆเฉเจ เจคเฉฑเจ เจ เจธเฉเจ เจเจธเจฆเฉ เจตเจฐเจคเฉเจ เจจเจนเฉเจ เจเจฐเจฆเฉ CTRL + Cเจเจธ เจจเฉเฉฐ เจชเฉเจฐเจพ เจเจฐเจจ เจฒเจ.
เจเจฟเฉฑเจคเจฐ 4. เจเจเจเจชเฉเฉฑเจ publish_logs.py
เจธเจพเจกเจพ เจชเจพเจเจชเจฒเจพเจเจจ เจเฉเจก เจฒเจฟเจเจฃเจพ
เจนเฉเจฃ เจเจฆเฉเจ เจธเจพเจกเฉ เจเฉเจฒ เจธเจญ เจเฉเจ เจคเจฟเจเจฐ เจนเฉ, เจ
เจธเฉเจ เจฎเจเจผเฉเจฆเจพเจฐ เจนเจฟเฉฑเจธเจพ เจธเจผเฉเจฐเฉ เจเจฐ เจธเจเจฆเฉ เจนเจพเจ - เจฌเฉเจฎ เจ
เจคเฉ เจชเจพเจเจฅเจจ เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจเฉ เจธเจพเจกเฉ เจชเจพเจเจชเจฒเจพเจเจจ เจจเฉเฉฐ เจเฉเจกเจฟเฉฐเจ เจเจฐเจจเจพเฅค เจเฉฑเจ เจฌเฉเจฎ เจชเจพเจเจชเจฒเจพเจเจจ เจฌเจฃเจพเจเจฃ เจฒเจ, เจธเจพเจจเฉเฉฐ เจเฉฑเจ เจชเจพเจเจชเจฒเจพเจเจจ เจเจฌเจเฉเจเจ (เจชเฉ) เจฌเจฃเจพเจเจฃ เจฆเฉ เจฒเฉเฉ เจนเฉ. เจเฉฑเจ เจตเจพเจฐ เจเจฆเฉเจ เจ
เจธเฉเจ เจชเจพเจเจชเจฒเจพเจเจจ เจเจฌเจเฉเจเจ เจฌเจฃเจพ เจฒเฉเจเจฆเฉ เจนเจพเจ, เจคเจพเจ เจ
เจธเฉเจ เจเจชเจฐเฉเจเจฐ เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจเฉ เจเฉฑเจ เจคเฉเจ เจฌเจพเจ
เจฆ เจเฉฑเจ เจเจ เจซเฉฐเจเจธเจผเจจเจพเจ เจจเฉเฉฐ เจฒเจพเจเฉ เจเจฐ เจธเจเจฆเฉ เจนเจพเจ pipe (|)
. เจเจฎ เจคเฉเจฐ 'เจคเฉ, เจตเจฐเจเจซเจฒเฉ เจนเฉเจ เจพเจ เจฆเจฟเฉฑเจคเฉ เจคเจธเจตเฉเจฐ เจฆเฉ เจคเจฐเฉเจนเจพเจ เจฆเจฟเจเจพเจ เจฆเจฟเฉฐเจฆเจพ เจนเฉเฅค
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
เจธเจพเจกเฉ เจเฉเจก เจตเจฟเฉฑเจ, เจ
เจธเฉเจ เจฆเฉ เจเจธเจเจฎ เจซเฉฐเจเจธเจผเจจ เจฌเจฃเจพเจตเจพเจเจเฉเฅค เจซเฉฐเจเจธเจผเจจ regex_clean
, เจเฉ เจกเฉเจเจพ เจจเฉเฉฐ เจธเจเฉเจจ เจเจฐเจฆเจพ เจนเฉ เจ
เจคเฉ เจซเฉฐเจเจธเจผเจจ เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจฆเฉ เจนเฉเจ PATTERNS เจธเฉเจเฉ เจฆเฉ เจ
เจงเจพเจฐ เจคเฉ เจธเฉฐเจฌเฉฐเจงเจฟเจค เจเจคเจพเจฐ เจจเฉเฉฐ เจฎเฉเฉ เจชเฉเจฐเจพเจชเจค เจเจฐเจฆเจพ เจนเฉ re.search
. เจซเฉฐเจเจธเจผเจจ เจเจพเจฎเฉ เจจเจพเจฒ เจตเฉฑเจ เจเฉเจคเฉ เจธเจคเจฐ เจตเจพเจชเจธ เจเจฐเจฆเจพ เจนเฉเฅค เจเฉ เจคเฉเจธเฉเจ เจจเจฟเจฏเจฎเจค เจธเจฎเฉเจเจฐเจจ เจฎเจพเจนเจฐ เจจเจนเฉเจ เจนเฉ, เจคเจพเจ เจฎเฉเจ เจเจธเจฆเฉ เจเจพเจเจ เจเจฐเจจ เจฆเฉ เจธเจฟเจซเจผเจพเจฐเจฟเจธเจผ เจเจฐเจฆเจพ เจนเจพเจ datetime
เจเจธ เจจเฉเฉฐ เจเฉฐเจฎ เจเจฐเจจ เจฒเจ เจเฉฑเจ เจซเฉฐเจเจธเจผเจจ เจฆเฉ เจ
เฉฐเจฆเจฐ. เจฎเฉเจจเฉเฉฐ เจซเจพเจเจฒ เจฆเฉ เจธเจผเฉเจฐเฉเจเจค เจตเจฟเฉฑเจ เจเฉฑเจ เจเจฏเจพเจค เจเจฒเจคเฉ เจฎเจฟเจฒ เจฐเจนเฉ เจธเฉ, เจเฉ เจเจฟ เจ
เจเฉเจฌ เจธเฉ. เจเจน เจธเฉเจเฉ เจซเจฟเจฐ เจซเฉฐเจเจธเจผเจจ เจจเฉเฉฐ เจชเจพเจธ เจเฉเจคเฉ เจเจพเจเจฆเฉ เจนเฉ เจฐเจพเจเจ เจเฉ เจฌเจฟเจเจเฉเจเจฐเฉ, เจเฉ เจธเจฟเจฐเจซเจผ เจธเจพเจกเฉ เจกเฉเจเจพ เจจเฉเฉฐ เจธเจพเจฐเจฃเฉ เจตเจฟเฉฑเจ เจเฉเฉเจฆเจพ เจนเฉเฅค เจฌเฉเจ เจกเฉเจเจพเจซเจฒเฉ เจเฉเจฌ เจ
เจคเฉ เจธเจเฉเจฐเฉเจฎเจฟเฉฐเจ เจกเฉเจเจพเจซเจฒเฉ เจเฉเจฌ เจฒเจ เจเฉเจก เจนเฉเจ เจพเจ เจฆเจฟเฉฑเจคเจพ เจเจฟเจ เจนเฉเฅค เจฌเฉเจ เจ
เจคเฉ เจธเจเฉเจฐเฉเจฎเจฟเฉฐเจ เจเฉเจก เจตเจฟเฉฑเจ เจธเจฟเจฐเจซ เจซเจฐเจ เจเจน เจนเฉ เจเจฟ เจฌเฉเจ เจตเจฟเฉฑเจ เจ
เจธเฉเจ CSV เจจเฉเฉฐ เจชเฉเฉเจนเจฆเฉ เจนเจพเจ src_path
เจซเฉฐเจเจธเจผเจจ เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจฆเฉ เจนเฉเจ ReadFromText
เจฌเฉเจฎ เจคเฉเจเฅค
เจฌเฉเจ เจกเฉเจเจพเจซเจฒเฉ เจเฉเจฌ (เจฌเฉเจ เจชเฉเจฐเฉเจธเฉเจธเจฟเฉฐเจ)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
เจธเจเฉเจฐเฉเจฎเจฟเฉฐเจ เจกเฉเจเจพเจซเจฒเฉ เจเฉเจฌ (เจธเจเฉเจฐเฉเจฎ เจชเฉเจฐเฉเจธเฉเจธเจฟเฉฐเจ)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
เจชเจพเจเจชเจฒเจพเจเจจ เจเจฒเจพเจ เจเจพ เจฐเจนเฉ เจนเฉ
เจ เจธเฉเจ เจชเจพเจเจชเจฒเจพเจเจจ เจจเฉเฉฐ เจเจ เจตเฉฑเจ-เจตเฉฑเจ เจคเจฐเฉเจเจฟเจเจ เจจเจพเจฒ เจเจฒเจพ เจธเจเจฆเฉ เจนเจพเจเฅค เจเฉเจเจฐ เจ เจธเฉเจ เจเจพเจนเฉเฉฐเจฆเฉ เจนเจพเจ, เจคเจพเจ เจ เจธเฉเจ เจเจธเจจเฉเฉฐ เจธเจฟเจฐเจซเจผ GCP เจตเจฟเฉฑเจ เจฐเจฟเจฎเฉเจเจฒเฉ เจฒเฉเจเจเจจ เจเจฐเจฆเฉ เจนเฉเจ เจเฉฑเจ เจเจฐเจฎเฉเจจเจฒ เจคเฉเจ เจธเจฅเจพเจจเจ เจคเฉเจฐ 'เจคเฉ เจเจฒเจพ เจธเจเจฆเฉ เจนเจพเจเฅค
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
เจนเจพเจฒเจพเจเจเจฟ, เจ เจธเฉเจ เจเจธเจจเฉเฉฐ เจกเฉเจเจพเจซเจฒเฉ เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจเฉ เจเจฒเจพเจเจฃ เจเจพ เจฐเจนเฉ เจนเจพเจเฅค เจ เจธเฉเจ เจนเฉเจ เจพเจ เจฆเจฟเฉฑเจคเฉ เจฒเฉเฉเฉเจเจฆเฉ เจชเฉเจฐเจพเจฎเฉเจเจฐเจพเจ เจจเฉเฉฐ เจธเฉเฉฑเจ เจเจฐเจเฉ เจนเฉเจ เจพเจ เจฆเจฟเฉฑเจคเฉ เจเจฎเจพเจเจก เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจเฉ เจ เจเจฟเจนเจพ เจเจฐ เจธเจเจฆเฉ เจนเจพเจเฅค
project
โ เจคเฉเจนเจพเจกเฉ GCP เจชเฉเจฐเฉเจเฉเจเจ เจฆเฉ IDเฅคrunner
เจเฉฑเจ เจชเจพเจเจชเจฒเจพเจเจจ เจฆเฉเฉเจพเจ เจนเฉ เจเฉ เจคเฉเจนเจพเจกเฉ เจชเฉเจฐเฉเจเจฐเจพเจฎ เจฆเจพ เจตเจฟเจธเจผเจฒเฉเจธเจผเจฃ เจเจฐเฉเจเจพ เจ เจคเฉ เจคเฉเจนเจพเจกเฉ เจชเจพเจเจชเจฒเจพเจเจจ เจฆเจพ เจจเจฟเจฐเจฎเจพเจฃ เจเจฐเฉเจเจพเฅค เจเจฒเจพเจเจก เจตเจฟเฉฑเจ เจเจฒเจพเจเจฃ เจฒเจ, เจคเฉเจนเจพเจจเฉเฉฐ เจเฉฑเจ DataflowRunner เจจเจฟเจฐเจงเจพเจฐเจฟเจค เจเจฐเจจเจพ เจเจพเจนเฉเจฆเจพ เจนเฉเฅคstaging_location
โ เจเฉฐเจฎ เจเจฐเจจ เจตเจพเจฒเฉ เจชเฉเจฐเฉเจธเฉเจธเจฐเจพเจ เจฆเฉเจเจฐเจพ เจฒเฉเฉเฉเจเจฆเฉ เจเฉเจก เจชเฉเจเฉเจเจพเจ เจจเฉเฉฐ เจเฉฐเจกเฉเจเจธ เจเจฐเจจ เจฒเจ เจเจฒเจพเจเจก เจกเฉเจเจพเจซเจฒเฉ เจเจฒเจพเจเจก เจธเจเฉเจฐเฉเจ เจฆเจพ เจฎเจพเจฐเจเฅคtemp_location
โ เจชเจพเจเจชเจฒเจพเจเจจ เจฆเฉ เจเฉฑเจฒเจฃ เจฆเฉเจฐเจพเจจ เจฌเจฃเจพเจเจเจ เจเจเจเจ เจ เจธเจฅเจพเจ เจจเฉเจเจฐเฉเจเจ เจฆเฉเจเจ เจซเจพเจเจฒเจพเจ เจจเฉเฉฐ เจธเจเฉเจฐ เจเจฐเจจ เจฒเจ เจเจฒเจพเจเจก เจกเฉเจเจพเจซเจฒเฉ เจเจฒเจพเจเจก เจธเจเฉเจฐเฉเจ เจฆเจพ เจฎเจพเจฐเจเฅคstreaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
เจเจฆเฉเจ เจเจน เจเจฎเจพเจเจก เจเฉฑเจฒ เจฐเจนเฉ เจนเฉ, เจ เจธเฉเจ เจเฉเจเจฒ เจเฉฐเจธเฉเจฒ เจตเจฟเฉฑเจ เจกเฉเจเจพเจซเจฒเฉ เจเฉเจฌ 'เจคเฉ เจเจพ เจธเจเจฆเฉ เจนเจพเจ เจ เจคเฉ เจธเจพเจกเฉ เจชเจพเจเจชเจฒเจพเจเจจ เจฆเฉเจ เจธเจเจฆเฉ เจนเจพเจเฅค เจเจฆเฉเจ เจ เจธเฉเจ เจชเจพเจเจชเจฒเจพเจเจจ 'เจคเฉ เจเจฒเจฟเฉฑเจ เจเจฐเจฆเฉ เจนเจพเจ, เจคเจพเจ เจธเจพเจจเฉเฉฐ เจเจฟเฉฑเจคเจฐ 4 เจฆเฉ เจธเจฎเจพเจจ เจเฉเจ เจฆเจฟเจเจพเจ เจฆเฉเจฃเจพ เจเจพเจนเฉเจฆเจพ เจนเฉเฅค เจกเฉเจฌเฉฑเจเจฟเฉฐเจ เจฆเฉ เจเจฆเฉเจธเจผเจพเจ เจฒเจ, เจตเจฟเจธเจคเฉเจฐเจฟเจค เจฒเฉเจเจธ เจจเฉเฉฐ เจฆเฉเจเจฃ เจฒเจ เจฒเฉเจเจธ เจ เจคเฉ เจซเจฟเจฐ เจธเจเฉเจเจกเฉเจฐเจพเจเจตเจฐ 'เจคเฉ เจเจพเจฃเจพ เจฌเจนเฉเจค เจฎเจฆเจฆเจเจพเจฐ เจนเฉ เจธเจเจฆเจพ เจนเฉเฅค เจเจธ เจจเฉ เจเจ เจฎเจพเจฎเจฒเจฟเจเจ เจตเจฟเฉฑเจ เจชเจพเจเจชเจฒเจพเจเจจ เจฆเฉ เจฎเฉเฉฑเจฆเจฟเจเจ เจจเฉเฉฐ เจนเฉฑเจฒ เจเจฐเจจ เจตเจฟเฉฑเจ เจฎเฉเจฐเฉ เจฎเจฆเจฆ เจเฉเจคเฉ เจนเฉเฅค
เจเจฟเฉฑเจคเจฐ 4: เจฌเฉเจฎ เจเจจเจตเฉเจ
เจฐ
BigQuery เจตเจฟเฉฑเจ เจธเจพเจกเฉ เจกเฉเจเจพ เจคเฉฑเจ เจชเจนเฉเฉฐเจ เจเจฐเฉ
เจเจธ เจฒเจ, เจธเจพเจกเฉ เจเฉเจฒ เจชเจนเจฟเจฒเจพเจ เจนเฉ เจเฉฑเจ เจชเจพเจเจชเจฒเจพเจเจจ เจนเฉเจฃเฉ เจเจพเจนเฉเจฆเฉ เจนเฉ เจเจฟเจธ เจตเจฟเฉฑเจ เจกเฉเจเจพ เจธเจพเจกเฉ เจเฉเจฌเจฒ เจตเจฟเฉฑเจ เจตเจนเจฟ เจฐเจฟเจนเจพ เจนเฉเฅค เจเจธเจฆเฉ เจเจพเจเจ เจเจฐเจจ เจฒเจ, เจ เจธเฉเจ BigQuery 'เจคเฉ เจเจพ เจธเจเจฆเฉ เจนเจพเจ เจ เจคเฉ เจกเฉเจเจพ เจจเฉเฉฐ เจฆเฉเจ เจธเจเจฆเฉ เจนเจพเจเฅค เจนเฉเจ เจพเจ เจฆเจฟเฉฑเจคเฉ เจเจฎเจพเจเจก เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจจ เจคเฉเจ เจฌเจพเจ เจฆ เจคเฉเจนเจพเจจเฉเฉฐ เจกเฉเจเจพเจธเฉเจ เจฆเฉเจเจ เจชเจนเจฟเจฒเฉเจเจ เจเฉเจ เจเจคเจพเจฐเจพเจ เจฆเฉเจเจฃเฉเจเจ เจเจพเจนเฉเจฆเฉเจเจ เจนเจจเฅค เจนเฉเจฃ เจเจฆเฉเจ เจธเจพเจกเฉ เจเฉเจฒ BigQuery เจตเจฟเฉฑเจ เจธเจเฉเจฐ เจเฉเจคเจพ เจกเจพเจเจพ เจนเฉ, เจ เจธเฉเจ เจนเฉเจฐ เจตเจฟเจธเจผเจฒเฉเจธเจผเจฃ เจเจฐ เจธเจเจฆเฉ เจนเจพเจ, เจจเจพเจฒ เจนเฉ เจธเจนเจฟเจฏเฉเจเฉเจเจ เจจเจพเจฒ เจกเจพเจเจพ เจธเจพเจเจเจพ เจเจฐ เจธเจเจฆเฉ เจนเจพเจ เจ เจคเฉ เจเจพเจฐเฉเจฌเจพเจฐเฉ เจธเจตเจพเจฒเจพเจ เจฆเฉ เจเจตเจพเจฌ เจฆเฉเจฃเจพ เจธเจผเฉเจฐเฉ เจเจฐ เจธเจเจฆเฉ เจนเจพเจเฅค
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
เจเจฟเฉฑเจคเจฐ 5: BigQuery
เจธเจฟเฉฑเจเจพ
เจ เจธเฉเจ เจเจธ เจเจฐเจฆเฉ เจนเจพเจ เจเจฟ เจเจน เจชเฉเจธเจ เจเฉฑเจ เจธเจเฉเจฐเฉเจฎเจฟเฉฐเจ เจกเฉเจเจพ เจชเจพเจเจชเจฒเจพเจเจจ เจฌเจฃเจพเจเจฃ เจฆเฉ เจจเจพเจฒ-เจจเจพเจฒ เจกเฉเจเจพ เจจเฉเฉฐ เจตเจงเฉเจฐเฉ เจชเจนเฉเฉฐเจเจฏเฉเจ เจฌเจฃเจพเจเจฃ เจฆเฉ เจคเจฐเฉเจเฉ เจฒเฉฑเจญเจฃ เจฆเฉ เจเฉฑเจ เจเจชเจฏเฉเจเฉ เจเจฆเจพเจนเจฐเจฃ เจตเจเฉเจ เจเฉฐเจฎ เจเจฐเฉเจเฉเฅค เจเจธ เจซเจพเจฐเจฎเฉเจ เจตเจฟเฉฑเจ เจกเจพเจเจพ เจธเจเฉเจฐ เจเจฐเจจ เจจเจพเจฒ เจธเจพเจจเฉเฉฐ เจฌเจนเฉเจค เจธเจพเจฐเฉ เจซเจพเจเจฆเฉ เจฎเจฟเจฒเจฆเฉ เจนเจจเฅค เจนเฉเจฃ เจ เจธเฉเจ เจฎเจนเฉฑเจคเจตเจชเฉเจฐเจจ เจธเจตเจพเจฒเจพเจ เจฆเฉ เจเจตเจพเจฌ เจฆเฉเจฃเจพ เจธเจผเฉเจฐเฉ เจเจฐ เจธเจเจฆเฉ เจนเจพเจ เจเจฟเจตเฉเจ เจเจฟ เจเจฟเฉฐเจจเฉ เจฒเฉเจ เจธเจพเจกเฉ เจเจคเจชเจพเจฆ เจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจฆเฉ เจนเจจ? เจเฉ เจคเฉเจนเจพเจกเจพ เจเจชเจญเฉเจเจคเจพ เจ เจงเจพเจฐ เจธเจฎเฉเจ เจฆเฉ เจจเจพเจฒ เจตเจง เจฐเจฟเจนเจพ เจนเฉ? เจเจคเจชเจพเจฆ เจฆเฉ เจเจฟเจนเฉเฉ เจชเจนเจฟเจฒเฉเจเจ เจจเจพเจฒ เจฒเฉเจ เจธเจญ เจคเฉเจ เจตเฉฑเจง เจเฉฑเจฒเจฌเจพเจค เจเจฐเจฆเฉ เจนเจจ? เจ เจคเฉ เจเฉ เจ เจเจฟเจนเฉเจเจ เจเจฒเจคเฉเจเจ เจนเจจ เจเจฟเฉฑเจฅเฉ เจจเจนเฉเจ เจนเฉเจฃเฉเจเจ เจเจพเจนเฉเจฆเฉเจเจ เจนเจจ? เจเจน เจเจน เจธเจตเจพเจฒ เจนเจจ เจเฉ เจธเฉฐเจธเจฅเจพ เจฒเจ เจฆเจฟเจฒเจเจธเจชเฉ เจฆเฉ เจนเฉเจฃเจเฉ. เจเจนเจจเจพเจ เจธเจตเจพเจฒเจพเจ เจฆเฉ เจเจตเจพเจฌเจพเจ เจคเฉเจ เจเจญเจฐเจจ เจตเจพเจฒเฉเจเจ เจธเฉเจเจพเจ เจฆเฉ เจเจงเจพเจฐ 'เจคเฉ, เจ เจธเฉเจ เจเจคเจชเจพเจฆ เจจเฉเฉฐ เจฌเจฟเจนเจคเจฐ เจฌเจฃเจพ เจธเจเจฆเฉ เจนเจพเจ เจ เจคเฉ เจเจชเจญเฉเจเจคเจพ เจฆเฉ เจธเจผเจฎเฉเจฒเฉเจ เจค เจตเจงเจพ เจธเจเจฆเฉ เจนเจพเจเฅค
เจฌเฉเจฎ เจเจธ เจเจฟเจธเจฎ เจฆเฉ เจเจธเจฐเจค เจฒเจ เจ เจธเจฒ เจตเจฟเฉฑเจ เจฒเจพเจญเจฆเจพเจเจ เจนเฉ เจ เจคเฉ เจเจธ เจฆเฉ เจจเจพเจฒ-เจจเจพเจฒ เจเจ เจนเฉเจฐ เจฆเจฟเจฒเจเจธเจช เจตเจฐเจคเฉเจ เจฆเฉ เจเฉเจธ เจตเฉ เจนเจจเฅค เจเจฆเจพเจนเจฐเจจ เจฒเจ, เจคเฉเจธเฉเจ เจฐเฉเจ เจฒ เจเจพเจเจฎ เจตเจฟเฉฑเจ เจธเจเจพเจ เจเจฟเฉฑเจ เจกเฉเจเจพ เจฆเจพ เจตเจฟเจธเจผเจฒเฉเจธเจผเจฃ เจเจฐเจจเจพ เจเจพเจนเฉเฉฐเจฆเฉ เจนเฉ เจ เจคเฉ เจตเจฟเจธเจผเจฒเฉเจธเจผเจฃ เจฆเฉ เจ เจงเจพเจฐ เจคเฉ เจตเจชเจพเจฐ เจเจฐเจจเจพ เจเจพเจนเฉเฉฐเจฆเฉ เจนเฉ, เจธเจผเจพเจเจฆ เจคเฉเจนเจพเจกเฉ เจเฉเจฒ เจตเจพเจนเจจเจพเจ เจคเฉเจ เจเจเจฃ เจตเจพเจฒเฉ เจธเฉเจเจธเจฐ เจกเฉเจเจพ เจนเจจ เจ เจคเฉ เจเฉเจฐเฉเจซเจฟเจ เจชเฉฑเจงเจฐ เจฆเฉ เจเจฃเจจเจพ เจเจฐเจจเจพ เจเจพเจนเฉเฉฐเจฆเฉ เจนเฉ. เจคเฉเจธเฉเจ, เจเจฆเจพเจนเจฐเจจ เจฒเจ, เจเฉฑเจ เจเฉเจฎเจฟเฉฐเจ เจเฉฐเจชเจจเฉ เจตเฉ เจนเฉ เจธเจเจฆเฉ เจนเฉ เจเฉ เจเจชเจญเฉเจเจคเจพ เจกเฉเจเจพ เจเจเฉฑเจ เจพ เจเจฐเจฆเฉ เจนเฉ เจ เจคเฉ เจฎเฉเฉฑเจ เจฎเฉเจเฉเจฐเจฟเจเจธ เจจเฉเฉฐ เจเจฐเฉเจ เจเจฐเจจ เจฒเจ เจกเฉเจธเจผเจฌเฉเจฐเจก เจฌเจฃเจพเจเจฃ เจฒเจ เจเจธเจฆเฉ เจตเจฐเจคเฉเจ เจเจฐเจฆเฉ เจนเฉเฅค เจ เฉเจ เจนเฉ, เจธเฉฑเจเจฃ, เจเจน เจเฉฑเจ เจนเฉเจฐ เจชเฉเจธเจ เจฒเจ เจเฉฑเจ เจตเจฟเจธเจผเจพ เจนเฉ, เจชเฉเฉเจนเจจ เจฒเจ เจงเฉฐเจจเจตเจพเจฆ, เจ เจคเฉ เจเจนเจจเจพเจ เจฒเจ เจเฉ เจชเฉเจฐเจพ เจเฉเจก เจฆเฉเจเจฃเจพ เจเจพเจนเฉเฉฐเจฆเฉ เจนเจจ, เจนเฉเจ เจพเจ เจฎเฉเจฐเฉ GitHub เจฆเจพ เจฒเจฟเฉฐเจ เจนเฉเฅค
เจเจน เจธเจญ เจนเฉ.
เจธเจฐเฉเจค: www.habr.com