เดเดฒเตเดฒเดพเดตเตผเดเตเดเตเด เดนเดพเดฏเต. เดเตเดดเตโเดธเดฟเดฒเต เดตเดฟเดฆเตเดฏเดพเตผเดคเตเดฅเดฟเดเตพเดเตเดเดพเดฏเดฟ เดชเตเดฐเดคเตเดฏเตเดเด เดคเดฏเตเดฏเดพเดฑเดพเดเตเดเดฟเดฏ เดฒเตเดเดจเดคเตเดคเดฟเดจเตเดฑเต เด
เดตเดธเดพเดจ เดญเดพเดเดคเตเดคเดฟเดจเตเดฑเต เดตเดฟเดตเตผเดคเตเดคเดจเด เดเดเตเดเตพ เดชเดเตเดเดฟเดเตเดจเตเดจเต.
เดคเดคเตเดธเดฎเดฏ เดชเตเดชเตเดชเตเดฒเตเดจเตเดเตพเดเตเดเดพเดฏเดฟ เด เดชเตเดชเดพเดเตเดเต เดฌเตเดฎเตเด เดกเดพเดฑเตเดฑเดพเดซเตเดฒเตเดฏเตเด
Google เดเตเดฒเตเดกเต เดธเดเตเดเตเดเดฐเดฟเดเตเดเตเดจเตเดจเต
เดถเตเดฐเดฆเตเดงเดฟเดเตเดเตเด: เดชเตเดคเตเดคเตบ 3-เตฝ เดชเตเดชเตเดชเตโเดฒเตเตป เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเดเตเดเตเดจเตเดจเดคเดฟเตฝ เดเดจเดฟเดเตเดเต เดชเตเดฐเดถเตโเดจเดฎเตเดฃเตเดเดพเดฏเดคเดฟเดจเดพเตฝ เดชเตเดชเตเดชเตโเดฒเตเตป เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเดเตเดเตเดจเตเดจเดคเดฟเดจเตเด เดเดทเตโเดเดพเดจเตเดธเตเดค เดฒเตเดเต เดกเดพเดฑเตเดฑ เดชเตเดฐเดธเดฟเดฆเตเดงเตเดเดฐเดฟเดเตเดเตเดจเตเดจเดคเดฟเดจเตเด เดเดพเตป Google เดเตเดฒเตเดกเต เดทเตเตฝ เดเดชเดฏเตเดเดฟเดเตเดเต.
เดชเตเดชเตเดชเตเดฒเตเตป เดเดฐเดเดญเดฟเดเตเดเตเดจเตเดจเดคเดฟเดจเต, เดเดเตเดเตพ เดเตเดฐเดฎเตเดเดฐเดฃเดเตเดเดณเดฟเตฝ เด
เตฝเดชเตเดชเด เดเตเดดเดฟเดเตเดเตเดฃเตเดเดคเตเดฃเตเดเต. เดจเดฟเดเตเดเดณเดฟเตฝ เดฎเตเดฎเตเดชเต GCP เดเดชเดฏเตเดเดฟเดเตเดเดพเดคเตเดคเดตเตผเดเตเดเดพเดฏเดฟ, เดเดคเดฟเตฝ เดตเดฟเดตเดฐเดฟเดเตเดเดฟเดฐเดฟเดเตเดเตเดจเตเดจ 6 เดเดเตเดเดเตเดเตพ เดจเดฟเดเตเดเตพ เดชเดฟเดจเตเดคเตเดเดฐเตเดฃเตเดเดคเตเดฃเตเดเต
เดเดคเดฟเดจเตเดถเตเดทเด, เดเดเตเดเดณเตเดเต เดธเตโเดเตเดฐเดฟเดชเตเดฑเตเดฑเตเดเตพ Google เดเตเดฒเตเดกเต เดธเตเดฑเตเดฑเตเดฑเตเดเดฟเดฒเตเดเตเดเต เด
เดชเตโเดฒเตเดกเต เดเตเดฏเตเดฏเตเดเดฏเตเด เด
เดต เดเดเตเดเดณเตเดเต Google เดเตเดฒเตเดกเต เดทเตเดฒเตเดฒเดฟเดฒเตเดเตเดเต เดชเดเตผเดคเตเดคเตเดเดฏเตเด เดเตเดฏเตเดฏเตเดฃเตเดเดคเตเดฃเตเดเต. เดเตเดฒเตเดกเต เดธเตเดฑเตเดฑเตเดฑเตเดเดฟเดฒเตเดเตเดเต เด
เดชเตโเดฒเตเดกเต เดเตเดฏเตเดฏเตเดจเตเดจเดคเต เดตเดณเดฐเต เดจเดฟเดธเตเดธเดพเดฐเดฎเดพเดฃเต (เดเดฐเต เดตเดฟเดตเดฐเดฃเด เดเดฃเตเดเตเดคเตเดคเดพเดจเดพเดเตเด
2 เดเดฟเดคเตเดฐเด
เดจเดฎเตเดเตเดเต เดซเดฏเดฒเตเดเตพ เดชเดเตผเดคเตเดคเดพเดจเตเด เดเดตเดถเตเดฏเดฎเดพเดฏ เดฒเตเดฌเตเดฐเดฑเดฟเดเตพ เดเตปเดธเตเดฑเตเดฑเดพเตพ เดเตเดฏเตเดฏเดพเดจเตเด เดเดตเดถเตเดฏเดฎเดพเดฏ เดเดฎเดพเตปเดกเตเดเตพ เดเตเดตเดเต เดชเดเตเดเดฟเดเดชเตเดชเตเดเตเดคเตเดคเดฟเดฏเดฟเดฐเดฟเดเตเดเตเดจเตเดจเต.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
เดเดเตเดเดณเตเดเต เดกเดพเดฑเตเดฑเดพเดฌเตเดธเตเด เดชเดเตเดเดฟเดเดฏเตเด เดธเตเดทเตเดเดฟเดเตเดเตเดจเตเดจเต
เดธเดเตเดเตเดเดฐเดฃเดตเตเดฎเดพเดฏเดฟ เดฌเดจเตเดงเดชเตเดชเตเดเตเด เดเดฒเตเดฒเดพ เดเดเตเดเดเตเดเดณเตเด เดเดเตเดเตพ เดชเตเตผเดคเตเดคเดฟเดฏเดพเดเตเดเดฟเดเตเดเดดเดฟเดเตเดเดพเตฝ, เด
เดเตเดคเตเดคเดคเดพเดฏเดฟ เดจเดฎเตเดฎเตพ เดเตเดฏเตเดฏเตเดฃเตเดเดคเต BigQuery-เดฏเดฟเตฝ เดเดฐเต เดกเดพเดฑเตเดฑเดพเดธเตเดฑเตเดฑเตเด เดชเดเตเดเดฟเดเดฏเตเด เดธเตเดทเตเดเดฟเดเตเดเตเด เดเดจเตเดจเดคเดพเดฃเต. เดเดคเต เดเตเดฏเตเดฏเตเดจเตเดจเดคเดฟเดจเต เดจเดฟเดฐเดตเดงเดฟ เดฎเดพเตผเดเดเตเดเดณเตเดฃเตเดเต, เดเดจเตเดจเดพเตฝ เดเดฑเตเดฑเดตเตเด เดฒเดณเดฟเดคเดฎเดพเดฏเดคเต เดเดฆเตเดฏเด เดเดฐเต เดกเดพเดฑเตเดฑเดพเดธเตเดฑเตเดฑเต เดธเตเดทเตเดเดฟเดเตเดเต Google เดเตเดฒเตเดกเต เดเตบเดธเตเตพ เดเดชเดฏเตเดเดฟเดเตเดเตเด เดเดจเตเดจเดคเดพเดฃเต. เดจเดฟเดเตเดเตพเดเตเดเต เดเตเดตเดเตเดฏเตเดณเตเดณ เดเดเตเดเดเตเดเตพ เดชเดฟเดจเตเดคเตเดเดฐเดพเด
เดเดฟเดคเตเดฐเด 3. เดชเดเตเดเดฟเด เดฒเตเดเดเตเดเต
เดเดชเดฏเตเดเตเดคเต เดฒเตเดเต เดกเดพเดฑเตเดฑ เดชเตเดฐเดธเดฟเดฆเตเดงเตเดเดฐเดฟเดเตเดเตเดจเตเดจเต
เดชเดฌเต/เดธเดฌเต เดเดเตเดเดณเตเดเต เดชเตเดชเตเดชเตเดฒเตเดจเดฟเดจเตเดฑเต เดเดฐเต เดจเดฟเตผเดฃเดพเดฏเด เดเดเดเดฎเดพเดฃเต, เดเดพเดฐเดฃเด เดเดคเต เดเดจเตเดจเดฟเดฒเดงเดฟเดเด เดธเตเดตเดคเดจเตเดคเตเดฐ เดเดชเตเดฒเดฟเดเตเดเตเดทเดจเตเดเดณเต เดชเดฐเดธเตเดชเดฐเด เดเดถเดฏเดตเดฟเดจเดฟเดฎเดฏเด เดจเดเดคเตเดคเดพเตป เด เดจเตเดตเดฆเดฟเดเตเดเตเดจเตเดจเต. เดชเตเดฐเดคเตเดฏเตเดเดฟเดเตเดเตเด, เดเดชเตเดฒเดฟเดเตเดเตเดทเดจเตเดเตพเดเตเดเดฟเดเดฏเดฟเตฝ เดธเดจเตเดฆเตเดถเดเตเดเตพ เด เดฏเดฏเตเดเตเดเดพเดจเตเด เดธเตเดตเตเดเดฐเดฟเดเตเดเดพเดจเตเด เดเดเตเดเดณเต เด เดจเตเดตเดฆเดฟเดเตเดเตเดจเตเดจ เดเดฐเต เดเดเดจเดฟเดฒเดเตเดเดพเดฐเดจเดพเดฏเดฟ เดเดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดเตเดเตเดจเตเดจเต. เดจเดฎเตเดฎเตพ เดเดฆเตเดฏเด เดเตเดฏเตเดฏเตเดฃเตเดเดคเต เดเดฐเต เดตเดฟเดทเดฏเด เดธเตเดทเตเดเดฟเดเตเดเตเด เดเดจเตเดจเดคเดพเดฃเต. เดเตบเดธเตเดณเดฟเดฒเต เดชเดฌเต/เดธเดฌเต เดเดจเตเดจเดคเดฟเดฒเตเดเตเดเต เดชเตเดฏเดฟ เดตเดฟเดทเดฏเด เดธเตเดทเตโเดเดฟเดเตเดเตเด เดเตเดฒเดฟเดเตเดเต เดเตเดฏเตเดฏเตเด.
เดฎเตเดเดณเดฟเตฝ เดจเดฟเตผเดตเดเดฟเดเตเดเดฟเดฐเดฟเดเตเดเตเดจเตเดจ เดฒเตเดเต เดกเดพเดฑเตเดฑ เดธเตเดทเตโเดเดฟเดเตเดเดพเตป เดเตเดตเดเตเดฏเตเดณเตเดณ เดเตเดกเต เดเดเตเดเดณเตเดเต เดธเตโเดเตเดฐเดฟเดชเตโเดฑเตเดฑเดฟเดจเต เดตเดฟเดณเดฟเดเตเดเตเดจเตเดจเต, เดคเตเดเตผเดจเตเดจเต เดฒเตเดเตเดเตพ เดชเดฌเต/เดธเดฌเดฟเดฒเตเดเตเดเต เดเดฃเดเตโเดฑเตเดฑเต เดเตเดฏเตโเดคเต เด
เดฏเดฏเตโเดเตเดเตเดจเตเดจเต. เดจเดฎเตเดฎเตพ เดเตเดฏเตเดฏเตเดฃเตเดเดคเต เดเดฐเต เดตเดธเตเดคเตเดตเดฟเดจเต เดธเตเดทเตเดเดฟเดเตเดเตเด เดเดจเตเดจเดคเดพเดฃเต เดชเตเดฐเดธเดพเดงเดเตป, เดฐเตเดคเดฟ เดเดชเดฏเตเดเดฟเดเตเดเต เดตเดฟเดทเดฏเดคเตเดคเดฟเดฒเตเดเตเดเตเดณเตเดณ เดชเดพเดค เดตเตเดฏเดเตเดคเดฎเดพเดเตเดเตเด topic_path
เดเดชเตเดชเด เดซเดเดเตโเดทเตป เดตเดฟเดณเดฟเดเตเดเตเด publish
ั topic_path
เดกเดพเดฑเตเดฑเดฏเตเด. เดเดเตเดเตพ เดเดฑเดเตเดเตเดฎเดคเดฟ เดเตเดฏเตเดฏเตเดจเตเดจ เดเดพเดฐเตเดฏเด เดถเตเดฐเดฆเตเดงเดฟเดเตเดเตเด generate_log_line
เดเดเตเดเดณเตเดเต เดธเตเดเตเดฐเดฟเดชเตเดฑเตเดฑเดฟเตฝ เดจเดฟเดจเตเดจเต stream_logs
, เด
เดคเดฟเดจเดพเตฝ เด เดซเดฏเดฒเตเดเตพ เดเดฐเต เดซเตเตพเดกเดฑเดฟเดฒเดพเดฃเตเดจเตเดจเต เดเดฑเดชเตเดชเดพเดเตเดเตเด, เด
เดฒเตเดฒเดพเดคเตเดคเดชเดเตเดทเด เดจเดฟเดเตเดเตพเดเตเดเต เดเดฐเต เดเดฑเดเตเดเตเดฎเดคเดฟ เดชเดฟเดถเดเต เดฒเดญเดฟเดเตเดเตเด. เดชเดฟเดจเตเดจเตเดเต เดเดคเต เดเดชเดฏเตเดเดฟเดเตเดเต เดเดเตเดเดณเตเดเต เดเตเดเดฟเตพ เดเตบเดธเตเดณเดฟเดฒเตเดเต เดเดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเดเตเดเดพเด:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
เดซเดฏเตฝ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดเตเดเตเดจเตเดจ เดเดเตป, เดคเดพเดดเตเดฏเตเดณเตเดณ เดเดฟเดคเตเดฐเดคเตเดคเดฟเตฝ เดเดพเดฃเดฟเดเตเดเดฟเดฐเดฟเดเตเดเตเดจเตเดจเดคเตเดชเตเดฒเต, เดเตบเดธเตเดณเดฟเดฒเตเดเตเดเตเดณเตเดณ เดฒเตเดเต เดกเดพเดฑเตเดฑเดฏเตเดเต เดเดเตเดเตเดชเตเดเตเดเต เดจเดฎเตเดเตเดเต เดเดพเดฃเดพเตป เดเดดเดฟเดฏเตเด. เดจเดฎเตเดฎเตพ เดเดชเดฏเตเดเดฟเดเตเดเดพเดคเตเดคเดฟเดเดคเตเดคเตเดณเด เดเดพเดฒเด เด เดธเตเดเตเดฐเดฟเดชเตเดฑเตเดฑเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดเตเดเตเด CTRL + Cเด เดคเต เดชเตเตผเดคเตเดคเดฟเดฏเดพเดเตเดเดพเตป.
เดเดฟเดคเตเดฐเด 4. เดเดเตเดเตเดชเตเดเตเดเต publish_logs.py
เดเดเตเดเดณเตเดเต เดชเตเดชเตเดชเตเดฒเตเตป เดเตเดกเต เดเดดเตเดคเตเดจเตเดจเต
เดเดชเตเดชเตเตพ เดเดเตเดเตพ เดเดฒเตเดฒเดพเด เดคเดฏเตเดฏเดพเดฑเดพเดเตเดเดฟเดฏเดฟเดเตเดเตเดฃเตเดเต, เดจเดฎเตเดเตเดเต เดฐเดธเดเดฐเดฎเดพเดฏ เดญเดพเดเด เดเดฐเดเดญเดฟเดเตเดเดพเด - เดฌเตเด, เดชเตเดคเตเดคเตบ เดเดจเตเดจเดฟเดต เดเดชเดฏเตเดเดฟเดเตเดเต เดเดเตเดเดณเตเดเต เดชเตเดชเตเดชเตเดฒเตเตป เดเตเดกเดฟเดเดเต. เดเดฐเต เดฌเตเด เดชเตเดชเตเดชเตเดฒเตเตป เดธเตเดทเตเดเดฟเดเตเดเตเดจเตเดจเดคเดฟเดจเต, เดเดเตเดเตพ เดเดฐเต เดชเตเดชเตเดชเตเดฒเตเตป เดเดฌเตเดเดเตเดฑเตเดฑเต (p) เดธเตเดทเตเดเดฟเดเตเดเตเดฃเตเดเดคเตเดฃเตเดเต. เดเดฐเต เดชเตเดชเตเดชเต เดฒเตเตป เดเดฌเตโเดเดเตโเดฑเตเดฑเต เดธเตเดทเตโเดเดฟเดเตเดเตเดเดดเดฟเดเตเดเดพเตฝ, เดเดชเตเดชเดฑเตเดฑเตเดฑเตผ เดเดชเดฏเตเดเดฟเดเตเดเต เดจเดฎเตเดเตเดเต เดเดจเตเดจเดฟเดฒเดงเดฟเดเด เดซเดเดเตโเดทเดจเตเดเตพ เดชเตเดฐเดฏเตเดเดฟเดเตเดเดพเตป เดเดดเดฟเดฏเตเด pipe (|)
. เดชเตเดคเตเดตเต, เดตเตผเดเตเดเตเดซเตเดฒเต เดเตเดตเดเตเดฏเตเดณเตเดณ เดเดฟเดคเตเดฐเด เดชเตเดฒเต เดเดพเดฃเดชเตเดชเตเดเตเดจเตเดจเต.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
เดเดเตเดเดณเตเดเต เดเตเดกเดฟเตฝ, เดเดเตเดเตพ เดฐเดฃเตเดเต เดเดทเตโเดเดพเดจเตเดธเตเดค เดชเตเดฐเดตเตผเดคเตเดคเดจเดเตเดเตพ เดธเตเดทเตเดเดฟเดเตเดเตเด. เดซเดเดเตเดทเตป regex_clean
, เด
เดคเต เดกเดพเดฑเตเดฑ เดธเตเดเดพเตป เดเตเดฏเตเดฏเตเดเดฏเตเด เดซเดเดเตเดทเตป เดเดชเดฏเตเดเดฟเดเตเดเต เดชเดพเดฑเตเดฑเตเตบเดธเต เดฒเดฟเดธเตเดฑเตเดฑเดฟเดจเต เด
เดเดฟเดธเตเดฅเดพเดจเดฎเดพเดเตเดเดฟ เด
เดจเตเดฌเดจเตเดง เดตเดฐเดฟ เดตเตเดฃเตเดเตเดเตเดเตเดเตเดเดฏเตเด เดเตเดฏเตเดฏเตเดจเตเดจเต re.search
. เดซเดเดเตเดทเตป เดเตเดฎเดฏเดพเตฝ เดตเตเตผเดคเดฟเดฐเดฟเดเตเด เดเดฐเต เดธเตเดเตเดฐเดฟเดเดเต เดจเตฝเดเตเดจเตเดจเต. เดจเดฟเดเตเดเตพ เดเดฐเต เดธเดพเดงเดพเดฐเดฃ เดเดเตเดธเตเดชเตเดฐเดทเตป เดตเดฟเดฆเดเตเดฆเตเดงเดจเดฒเตเดฒเตเดเตเดเดฟเตฝ, เดเดคเต เดชเดฐเดฟเดถเตเดงเดฟเดเตเดเดพเตป เดเดพเตป เดถเตเดชเดพเตผเดถ เดเตเดฏเตเดฏเตเดจเตเดจเต datetime
เดเดฐเต เดซเดเดเตโเดทเดจเตเดฑเต เดเดณเตเดณเดฟเตฝ เด
เดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดเตเดเตเดจเตเดจเต. เดซเดฏเดฒเดฟเดจเตเดฑเต เดคเตเดเดเตเดเดคเตเดคเดฟเตฝ เดเดจเดฟเดเตเดเต เดเดฐเต เดเดฑเดเตเดเตเดฎเดคเดฟ เดชเดฟเดถเดเต เดฒเดญเดฟเดเตเดเตเดจเตเดจเต, เด
เดคเต เดตเดฟเดเดฟเดคเตเดฐเดฎเดพเดฏเดฟเดฐเตเดจเตเดจเต. เด เดฒเดฟเดธเตเดฑเตเดฑเต เดชเดฟเดจเตเดจเตเดเต เดซเดเดเตเดทเดจเดฟเดฒเตเดเตเดเต เดเตเดฎเดพเดฑเตเด WriteToBigQuery, เดเดคเต เดเดเตเดเดณเตเดเต เดกเดพเดฑเตเดฑเดฏเต เดชเดเตเดเดฟเดเดฏเดฟเดฒเตเดเตเดเต เดเตเตผเดเตเดเตเดจเตเดจเต. Batch DataFlow Job, Streaming DataFlow Job เดเดจเตเดจเดฟเดตเดฏเตเดเต เดเตเดกเต เดเตเดตเดเต เดจเตฝเดเดฟเดฏเดฟเดฐเดฟเดเตเดเตเดจเตเดจเต. เดฌเดพเดเตเดเตเด เดธเตเดเตเดฐเตเดฎเดฟเดเดเต เดเตเดกเตเด เดคเดฎเตเดฎเดฟเดฒเตเดณเตเดณ เดเดฐเตเดฏเตเดฐเต เดตเตเดฏเดคเตเดฏเดพเดธเด เดฌเดพเดเตเดเดฟเตฝ เดเดเตเดเตพ CSV เดตเดพเดฏเดฟเดเตเดเตเดจเตเดจเต เดเดจเตเดจเดคเดพเดฃเต src_path
เดซเดเดเตเดทเตป เดเดชเดฏเตเดเดฟเดเตเดเต ReadFromText
เดฌเตเดฎเดฟเตฝ เดจเดฟเดจเตเดจเต.
เดฌเดพเดเตเดเต เดกเดพเดฑเตเดฑเดซเตเดฒเต เดเตเดฒเดฟ (เดฌเดพเดเตเดเต เดชเตเดฐเตเดธเดธเตเดธเดฟเดเดเต)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
เดธเตเดเตเดฐเตเดฎเดฟเดเดเต เดกเดพเดฑเตเดฑเดซเตเดฒเต เดเตเดฒเดฟ (เดธเตเดเตเดฐเตเด เดชเตเดฐเตเดธเดธเตเดธเดฟเดเดเต)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
เดเตบเดตเตเดฏเตผ เดเดฐเดเดญเดฟเดเตเดเตเดจเตเดจเต
เดชเตเดชเตเดชเต เดฒเตเตป เดจเดฎเตเดเตเดเต เดชเดฒ เดคเดฐเดคเตเดคเดฟเตฝ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเดเตเดเดพเด. เดเดเตเดเตพเดเตเดเต เดตเตเดฃเดฎเตเดเตเดเดฟเตฝ, เดเดฟเดธเดฟเดชเดฟเดฏเดฟเดฒเตเดเตเดเต เดตเดฟเดฆเตเดฐเดฎเดพเดฏเดฟ เดฒเตเดเดฟเตป เดเตเดฏเตเดฏเตเดฎเตเดชเตเตพ เดเตเตผเดฎเดฟเดจเดฒเดฟเตฝ เดจเดฟเดจเตเดจเต เดชเตเดฐเดพเดฆเตเดถเดฟเดเดฎเดพเดฏเดฟ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเดเตเดเดพเด.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
เดเดจเตเดจเดฟเดฐเตเดจเตเดจเดพเดฒเตเด, DataFlow เดเดชเดฏเตเดเดฟเดเตเดเต เดเดเตเดเตพ เดเดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเดเตเดเดพเตป เดชเตเดเตเดจเตเดจเต. เดเดจเดฟเดชเตเดชเดฑเดฏเตเดจเตเดจ เดเดตเดถเตเดฏเดฎเดพเดฏ เดชเดพเดฐเดพเดฎเตเดฑเตเดฑเดฑเตเดเตพ เดธเดเตเดเตเดเดฐเดฟเดเตเดเต เดเตเดตเดเตเดฏเตเดณเตเดณ เดเดฎเดพเตปเดกเต เดเดชเดฏเตเดเดฟเดเตเดเต เดจเดฎเตเดเตเดเต เดเดคเต เดเตเดฏเตเดฏเดพเตป เดเดดเดฟเดฏเตเด.
project
โ เดจเดฟเดเตเดเดณเตเดเต GCP เดชเตเดฐเตเดเดเตเดฑเตเดฑเดฟเดจเตเดฑเต เดเดกเดฟ.runner
เดจเดฟเดเตเดเดณเตเดเต เดชเตเดฐเตเดเตเดฐเดพเด เดตเดฟเดถเดเดฒเดจเด เดเตเดฏเตเดฏเตเดเดฏเตเด เดชเตเดชเตเดชเตเดฒเตเตป เดจเดฟเตผเดฎเตเดฎเดฟเดเตเดเตเดเดฏเตเด เดเตเดฏเตเดฏเตเดจเตเดจ เดเดฐเต เดชเตเดชเตเดชเตเดฒเตเตป เดฑเดฃเตเดฃเดฑเดพเดฃเต. เดเตเดฒเตเดกเดฟเตฝ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดเตเดเดพเตป, เดจเดฟเดเตเดเตพ เดเดฐเต DataflowRunner เดตเตเดฏเดเตเดคเดฎเดพเดเตเดเดฃเด.staging_location
โ เดเตเดฒเดฟ เดจเดฟเตผเดตเดนเดฟเดเตเดเตเดจเตเดจ เดชเตเดฐเตเดธเดธเตเดธเดฑเตเดเตพเดเตเดเต เดเดตเดถเตเดฏเดฎเดพเดฏ เดเตเดกเต เดชเดพเดเตเดเตเดเตเดเตพ เดธเตเดเดฟเดเดฏเดฟเดฒเดพเดเตเดเตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เดเตเดฒเตเดกเต เดกเดพเดฑเตเดฑเดพเดซเตเดฒเต เดเตเดฒเตเดกเต เดธเตเดฑเตเดฑเตเดฑเตเดเดฟเดฒเตเดเตเดเตเดณเตเดณ เดชเดพเดค.temp_location
โ เดชเตเดชเตเดชเตโเดฒเตเตป เดชเตเดฐเดตเตผเดคเตเดคเดฟเดเตเดเตเดฎเตเดชเตเตพ เดธเตเดทเตโเดเดฟเดเตเด เดคเดพเตฝเดเตเดเดพเดฒเดฟเด เดเตเดฒเดฟ เดซเดฏเดฒเตเดเตพ เดธเดเดญเดฐเดฟเดเตเดเตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เดเตเดฒเตเดกเต เดกเดพเดฑเตเดฑเดพเดซเตเดฒเต เดเตเดฒเตเดกเต เดธเดเดญเดฐเดฃเดคเตเดคเดฟเดฒเตเดเตเดเตเดณเตเดณ เดชเดพเดค.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
เด เดเดฎเดพเตปเดกเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดเตเดเตเดฎเตเดชเตเตพ, เดจเดฎเตเดเตเดเต เดเตเดเดฟเตพ เดเตบเดธเตเดณเดฟเดฒเต เดกเดพเดฑเตเดฑเดพเดซเตเดฒเต เดเดพเดฌเดฟเดฒเตเดเตเดเต เดชเตเดฏเดฟ เดเดเตเดเดณเตเดเต เดชเตเดชเตเดชเตเดฒเตเตป เดเดพเดฃเดพเด. เดชเตเดชเตเดชเตโเดฒเตเดจเดฟเตฝ เดเตเดฒเดฟเดเตเดเต เดเตเดฏเตเดฏเตเดฎเตเดชเตเตพ, เดเดฟเดคเตเดฐเด 4-เดจเต เดธเดฎเดพเดจเดฎเดพเดฏ เดเดจเตเดจเต เดจเดฎเตเดฎเตพ เดเดพเดฃเตเด. เดกเตเดฌเดเตเดเดฟเดเดเต เดเดตเดถเตเดฏเดเตเดเตพเดเตเดเดพเดฏเดฟ, เดตเดฟเดถเดฆเดฎเดพเดฏ เดฒเตเดเตเดเตพ เดเดพเดฃเตเดจเตเดจเดคเดฟเดจเต เดฒเตเดเตเดเดณเดฟเดฒเตเดเตเดเตเด เดคเตเดเตผเดจเตเดจเต Stackdriver-เดฒเตเดฏเตเดเตเดเตเด เดชเตเดเตเดจเตเดจเดคเต เดตเดณเดฐเต เดธเดนเดพเดฏเดเดฐเดฎเดพเดฃเต. เดจเดฟเดฐเดตเดงเดฟ เดเตเดธเตเดเดณเดฟเดฒเต เดชเตเดชเตเดชเต เดฒเตเตป เดชเตเดฐเดถเตเดจเดเตเดเตพ เดชเดฐเดฟเดนเดฐเดฟเดเตเดเดพเตป เดเดคเต เดเดจเตเดจเต เดธเดนเดพเดฏเดฟเดเตเดเดฟเดเตเดเตเดฃเตเดเต.
เดเดฟเดคเตเดฐเด 4: เดฌเตเด เดเตบเดตเตเดฏเตผ
BigQuery-เตฝ เดเดเตเดเดณเตเดเต เดกเดพเดฑเตเดฑ เดเดเตโเดธเดธเต เดเตเดฏเตเดฏเตเด
เด เดคเดฟเดจเดพเตฝ, เดเดเตเดเดณเตเดเต เดเตเดฌเดฟเดณเดฟเดฒเตเดเตเดเต เดกเดพเดฑเตเดฑ เดเดดเตเดเตเดจเตเดจ เดเดฐเต เดชเตเดชเตเดชเตเดฒเตเตป เดเดคเดฟเดจเดเด เดคเดจเตเดจเต เดเดฃเตเดเดพเดฏเดฟเดฐเดฟเดเตเดเดฃเด. เดเดคเต เดชเดฐเดฟเดถเตเดงเดฟเดเตเดเตเดจเตเดจเดคเดฟเดจเต, เดจเดฎเตเดเตเดเต BigQuery-เดฒเตเดเตเดเต เดชเตเดฏเดฟ เดกเดพเดฑเตเดฑ เดจเตเดเตเดเดพเด. เดเตเดตเดเตเดฏเตเดณเตเดณ เดเดฎเดพเตปเดกเต เดเดชเดฏเตเดเดฟเดเตเดเดคเดฟเดจเต เดถเตเดทเด เดจเดฟเดเตเดเตพ เดกเดพเดฑเตเดฑเดพเดธเตเดฑเตเดฑเดฟเดจเตเดฑเต เดเดฆเตเดฏ เดเตเดฑเดเตเดเต เดตเดฐเดฟเดเตพ เดเดพเดฃเตเด. เดเดชเตเดชเตเตพ BigQuery-เตฝ เดกเดพเดฑเตเดฑ เดธเดเดญเดฐเดฟเดเตเดเดฟเดฐเดฟเดเตเดเตเดจเตเดจเดคเดฟเดจเดพเตฝ, เดเดเตเดเตพเดเตเดเต เดเตเดเตเดคเตฝ เดตเดฟเดถเดเดฒเดจเด เดจเดเดคเตเดคเดพเดจเตเด เด เดคเตเดชเตเดฒเต เดธเดนเดชเตเดฐเดตเตผเดคเตเดคเดเดฐเตเดฎเดพเดฏเดฟ เดกเดพเดฑเตเดฑ เดชเดเตเดเดฟเดเดพเดจเตเด เดฌเดฟเดธเดฟเดจเดธเต เดเตเดฆเตเดฏเดเตเดเตพเดเตเดเต เดเดคเตเดคเดฐเด เดจเตฝเดเดพเดจเตเด เดเดดเดฟเดฏเตเด.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
เดเดฟเดคเตเดฐเด 5: BigQuery
เดคเตเดฐเตเดฎเดพเดจเด
เดเดฐเต เดธเตเดเตเดฐเตเดฎเดฟเดเดเต เดกเดพเดฑเตเดฑเดพ เดชเตเดชเตเดชเตโเดฒเตเตป เดธเตเดทเตโเดเดฟเดเตเดเตเดจเตเดจเดคเดฟเดจเตเด เดกเดพเดฑเตเดฑ เดเตเดเตเดคเตฝ เดเดเตโเดธเดธเต เดเตเดฏเตเดฏเดพเดจเตเดณเตเดณ เดตเดดเดฟเดเตพ เดเดฃเตเดเตเดคเตเดคเตเดจเตเดจเดคเดฟเดจเตเดฎเตเดณเตเดณ เดเดชเดฏเตเดเดชเตเดฐเดฆเดฎเดพเดฏ เดเดฆเดพเดนเดฐเดฃเดฎเดพเดฏเดฟ เด เดชเตเดธเตเดฑเตเดฑเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดเตเดเตเดฎเตเดจเตเดจเต เดเดเตเดเตพ เดชเตเดฐเดคเตเดเตเดทเดฟเดเตเดเตเดจเตเดจเต. เด เดซเตเตผเดฎเดพเดฑเตเดฑเดฟเตฝ เดกเดพเดฑเตเดฑ เดธเดเดญเดฐเดฟเดเตเดเตเดจเตเดจเดคเต เดจเดฎเตเดเตเดเต เดงเดพเดฐเดพเดณเด เดเตเดฃเดเตเดเตพ เดจเตฝเดเตเดจเตเดจเต. เดเดเตเดเดณเตเดเต เดเตฝเดชเตเดชเดจเตเดจเด เดเดคเตเดฐ เดชเตเตผ เดเดชเดฏเตเดเดฟเดเตเดเตเดจเตเดจเต เดเดจเตเดจเดคเตเดชเตเดฒเตเดณเตเดณ เดชเตเดฐเดงเดพเดจเดชเตเดชเตเดเตเด เดเตเดฆเตเดฏเดเตเดเตพเดเตเดเต เดเดชเตเดชเตเตพ เดเดคเตเดคเดฐเด เดจเตฝเดเดพเตป เดคเตเดเดเตเดเดพเด. เดเดพเดฒเดเตเดฐเดฎเตเดฃ เดจเดฟเดเตเดเดณเตเดเต เดเดชเดฏเตเดเตเดคเต เด เดเดฟเดคเตเดคเดฑ เดตเดณเดฐเตเดเดฏเดพเดฃเต? เดเตฝเดชเตเดชเดจเตเดจเดคเตเดคเดฟเดจเตเดฑเต เดเดคเต เดตเดถเดเตเดเดณเตเดฎเดพเดฏเดฟ เดเดณเตเดเตพ เดเตเดเตเดคเตฝ เดเดเดชเดดเดเตเดจเตเดจเต? เดเตเดเดพเดคเต เดเดฃเตเดเดพเดเดพเตป เดชเดพเดเดฟเดฒเตเดฒเดพเดคเตเดคเดฟเดเดคเตเดคเต เดชเดฟเดดเดตเตเดเดณเตเดฃเตเดเต? เด เดเตเดฆเตเดฏเดเตเดเดณเดพเดฃเต เดธเตเดฅเดพเดชเดจเดคเตเดคเดฟเดจเต เดคเดพเตฝเดชเตเดชเดฐเตเดฏเดฎเตเดณเตเดณเดคเต. เด เดเตเดฆเตเดฏเดเตเดเตพเดเตเดเตเดณเตเดณ เดเดคเตเดคเดฐเดเตเดเดณเดฟเตฝ เดจเดฟเดจเตเดจเต เดเดฏเตผเดจเตเดจเตเดตเดฐเตเดจเตเดจ เดธเตเดฅเดฟเดคเดฟเดตเดฟเดตเดฐเดเตเดเดฃเดเตเดเตเดเตพ เด เดเดฟเดธเตเดฅเดพเดจเดฎเดพเดเตเดเดฟ, เดเดเตเดเตพเดเตเดเต เดเตฝเดชเตเดชเดจเตเดจเด เดฎเตเดเตเดเดชเตเดชเตเดเตเดคเตเดคเดพเดจเตเด เดเดชเดฏเตเดเตเดคเต เดเดเดชเดดเดเตฝ เดตเตผเดฆเตเดงเดฟเดชเตเดชเดฟเดเตเดเดพเดจเตเด เดเดดเดฟเดฏเตเด.
เดเดคเตเดคเดฐเดคเตเดคเดฟเดฒเตเดณเตเดณ เดตเตเดฏเดพเดฏเดพเดฎเดคเตเดคเดฟเดจเต เดฌเตเด เดถเดฐเดฟเดเตเดเตเด เดเดชเดฏเตเดเดชเตเดฐเดฆเดฎเดพเดฃเต เดเตเดเดพเดคเต เดฎเดฑเตเดฑเต เดฐเดธเดเดฐเดฎเดพเดฏ เดเดชเดฏเตเด เดเตเดธเตเดเดณเตเด เดเดฃเตเดเต. เดเดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เดจเดฟเดเตเดเตพเดเตเดเต เดธเตเดฑเตเดฑเตเดเตเดเต เดเดฟเดเตเดเต เดกเดพเดฑเตเดฑ เดคเดคเตเดธเดฎเดฏเด เดตเดฟเดถเดเดฒเดจเด เดเตเดฏเตเดฏเดพเดจเตเด เดตเดฟเดถเดเดฒเดจเดคเตเดคเต เด เดเดฟเดธเตเดฅเดพเดจเดฎเดพเดเตเดเดฟ เดเตเดฐเตเดกเตเดเตพ เดจเดเดคเตเดคเดพเดจเตเด เดจเดฟเดเตเดเตพ เดเดเตเดฐเดนเดฟเดเตเดเตเดเตเดเดพเด, เดเดฐเตเดชเดเตเดทเต เดจเดฟเดเตเดเตพเดเตเดเต เดตเดพเดนเดจเดเตเดเดณเดฟเตฝ เดจเดฟเดจเตเดจเตเดณเตเดณ เดธเตเตปเดธเตผ เดกเดพเดฑเตเดฑเดฏเตเด เดเตเดฐเดพเดซเดฟเดเต เดฒเตเดตเตฝ เดเดฃเดเตเดเตเดเตเดเตเดเดฒเตเดเตพ เดเดฃเดเตเดเดพเดเตเดเดพเดจเตเด เดคเดพเตฝเดชเตเดชเดฐเตเดฏเดฎเตเดฃเตเดเดพเดเดพเด. เดเดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เดจเดฟเดเตเดเตพเดเตเดเต เดเดชเดฏเตเดเตเดคเต เดกเดพเดฑเตเดฑ เดถเตเดเดฐเดฟเดเตเดเตเดเดฏเตเด เดชเตเดฐเดงเดพเดจ เด เดณเดตเตเดเตพ เดเตเดฐเดพเดเตเดเตเดเตเดฏเตเดฏเตเดจเตเดจเดคเดฟเดจเต เดกเดพเดทเตโเดฌเตเตผเดกเตเดเตพ เดธเตเดทเตโเดเดฟเดเตเดเดพเตป เด เดคเต เดเดชเดฏเตเดเดฟเดเตเดเตเดเดฏเตเด เดเตเดฏเตเดฏเตเดจเตเดจ เดเดฐเต เดเตเดฏเดฟเดฎเดฟเดเดเต เดเดฎเตเดชเดจเดฟเดฏเดพเดเดพเด. เดถเดฐเดฟ, เดฎเดพเดจเตเดฏเดฐเต, เดเดคเต เดฎเดฑเตเดฑเตเดฐเต เดชเตเดธเตเดฑเตเดฑเดฟเดจเตเดณเตเดณ เดตเดฟเดทเดฏเดฎเดพเดฃเต, เดตเดพเดฏเดฟเดเตเดเดคเดฟเดจเต เดจเดจเตเดฆเดฟ, เดเตเดเดพเดคเต เดฎเตเดดเตเดตเตป เดเตเดกเตเด เดเดพเดฃเดพเตป เดเดเตเดฐเดนเดฟเดเตเดเตเดจเตเดจเดตเตผเดเตเดเต เดเดจเตเดฑเต GitHub-เดฒเตเดเตเดเตเดณเตเดณ เดฒเดฟเดเตเดเต เดเตเดตเดเตเดฏเตเดฃเตเดเต.
เด
เดคเตเดฐเดฎเดพเดคเตเดฐเด.
เด
เดตเดฒเดเดฌเด: www.habr.com