์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. 2 ๋ถ€

์•ˆ๋…•ํ•˜์„ธ์š” ์—ฌ๋Ÿฌ๋ถ„. ์šฐ๋ฆฌ๋Š” ์ด ๊ณผ์ •์˜ ํ•™์ƒ๋“ค์„ ์œ„ํ•ด ํŠน๋ณ„ํžˆ ์ค€๋น„๋œ ๊ธฐ์‚ฌ์˜ ๋งˆ์ง€๋ง‰ ๋ถ€๋ถ„์˜ ๋ฒˆ์—ญ์„ ๊ณต์œ ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด. ์ฒซ ๋ฒˆ์งธ ๋ถ€๋ถ„์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค ์—ฌ๊ธฐ์—.

์‹ค์‹œ๊ฐ„ ํŒŒ์ดํ”„๋ผ์ธ์„ ์œ„ํ•œ Apache Beam ๋ฐ DataFlow

์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. 2 ๋ถ€

Google ํด๋ผ์šฐ๋“œ ์„ค์ •

์ฐธ๊ณ : Python 3์—์„œ ํŒŒ์ดํ”„๋ผ์ธ์„ ์‹คํ–‰ํ•˜๋Š” ๋ฐ ๋ฌธ์ œ๊ฐ€ ์žˆ์—ˆ๊ธฐ ๋•Œ๋ฌธ์— Google Cloud Shell์„ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์‹คํ–‰ํ•˜๊ณ  ์ปค์Šคํ…€ ๋กœ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฒŒ์‹œํ–ˆ์Šต๋‹ˆ๋‹ค. Google Cloud Shell์€ Apache Beam๊ณผ ๋” ์ผ๊ด€๋œ Python 2๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

ํŒŒ์ดํ”„๋ผ์ธ์„ ์‹œ์ž‘ํ•˜๋ ค๋ฉด ์„ค์ •์„ ์ข€ ๋” ์ž์„ธํžˆ ์‚ดํŽด๋ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ด์ „์— GCP๋ฅผ ์‚ฌ์šฉํ•ด ๋ณธ ์ ์ด ์—†๋Š” ๊ฒฝ์šฐ ์—ฌ๊ธฐ์— ์„ค๋ช…๋œ ๋‹ค์Œ 6๋‹จ๊ณ„๋ฅผ ๋”ฐ๋ผ์•ผ ํ•ฉ๋‹ˆ๋‹ค. ํŽ˜์ด์ง€.

๊ทธ๋Ÿฐ ๋‹ค์Œ ์Šคํฌ๋ฆฝํŠธ๋ฅผ Google Cloud Storage์— ์—…๋กœ๋“œํ•˜๊ณ  Google Cloud Shel์— ๋ณต์‚ฌํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ํด๋ผ์šฐ๋“œ ์ €์žฅ์†Œ์— ์—…๋กœ๋“œํ•˜๋Š” ๊ฒƒ์€ ๋งค์šฐ ๊ฐ„๋‹จํ•ฉ๋‹ˆ๋‹ค(์„ค๋ช…์€ ์ฐพ์„ ์ˆ˜ ์žˆ์Œ). ์—ฌ๊ธฐ์—). ํŒŒ์ผ์„ ๋ณต์‚ฌํ•˜๋ ค๋ฉด ์•„๋ž˜ ๊ทธ๋ฆผ 2์˜ ์™ผ์ชฝ์— ์žˆ๋Š” ์ฒซ ๋ฒˆ์งธ ์•„์ด์ฝ˜์„ ํด๋ฆญํ•˜์—ฌ ํˆด๋ฐ”์—์„œ Google Cloud Shel์„ ์—ด ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. 2 ๋ถ€
๊ทธ๋ฆผ 2

ํŒŒ์ผ์„ ๋ณต์‚ฌํ•˜๊ณ  ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์„ค์น˜ํ•˜๋Š” ๋ฐ ํ•„์š”ํ•œ ๋ช…๋ น์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ ํ…Œ์ด๋ธ” ๋งŒ๋“ค๊ธฐ

๋ชจ๋“  ์„ค์ • ๊ด€๋ จ ๋‹จ๊ณ„๋ฅผ ์™„๋ฃŒํ•˜๊ณ  ๋‚˜๋ฉด ๋‹ค์Œ์œผ๋กœ ํ•ด์•ผ ํ•  ์ผ์€ BigQuery์—์„œ ๋ฐ์ดํ„ฐ ์„ธํŠธ์™€ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“œ๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์ด๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ์—ฌ๋Ÿฌ ๊ฐ€์ง€๊ฐ€ ์žˆ์ง€๋งŒ ๊ฐ€์žฅ ๊ฐ„๋‹จํ•œ ๋ฐฉ๋ฒ•์€ ๋จผ์ € ๋ฐ์ดํ„ฐ ์„ธํŠธ๋ฅผ ๋งŒ๋“ค์–ด Google Cloud ์ฝ˜์†”์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์•„๋ž˜ ๋‹จ๊ณ„๋ฅผ ๋”ฐ๋ฅด์„ธ์š”. ๋งํฌ์Šคํ‚ค๋งˆ๊ฐ€ ์žˆ๋Š” ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์šฐ๋ฆฌ ํ…Œ์ด๋ธ”์€ 7์—ด, ๊ฐ ์‚ฌ์šฉ์ž ๋กœ๊ทธ์˜ ๊ตฌ์„ฑ ์š”์†Œ์— ํ•ด๋‹นํ•ฉ๋‹ˆ๋‹ค. ํŽธ์˜์ƒ timelocal ๋ณ€์ˆ˜๋ฅผ ์ œ์™ธํ•œ ๋ชจ๋“  ์—ด์„ ๋ฌธ์ž์—ด๋กœ ์ •์˜ํ•˜๊ณ  ์•ž์„œ ์ƒ์„ฑํ•œ ๋ณ€์ˆ˜์— ๋”ฐ๋ผ ์ด๋ฆ„์„ ์ง€์ •ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. ํ…Œ์ด๋ธ”์˜ ๋ ˆ์ด์•„์›ƒ์€ ๊ทธ๋ฆผ 3๊ณผ ๊ฐ™์•„์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. 2 ๋ถ€
๊ทธ๋ฆผ 3. ํ…Œ์ด๋ธ” ๋ ˆ์ด์•„์›ƒ

์‚ฌ์šฉ์ž ๋กœ๊ทธ ๋ฐ์ดํ„ฐ ๊ฒŒ์‹œ

Pub/Sub๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ๋…๋ฆฝ์ ์ธ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์„œ๋กœ ํ†ต์‹ ํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ฃผ๊ธฐ ๋•Œ๋ฌธ์— ์šฐ๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์˜ ์ค‘์š”ํ•œ ๊ตฌ์„ฑ์š”์†Œ์ž…๋‹ˆ๋‹ค. ํŠนํžˆ, ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ฐ„์— ๋ฉ”์‹œ์ง€๋ฅผ ์ฃผ๊ณ ๋ฐ›์„ ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ฃผ๋Š” ์ค‘๊ฐœ์ž ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค. ๊ฐ€์žฅ ๋จผ์ € ํ•ด์•ผ ํ•  ์ผ์€ ์ฃผ์ œ๋ฅผ ๋งŒ๋“œ๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์ฝ˜์†”์—์„œ Pub/Sub๋กœ ์ด๋™ํ•˜์—ฌ ์ฃผ์ œ ๋งŒ๋“ค๊ธฐ๋ฅผ ํด๋ฆญํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

์•„๋ž˜ ์ฝ”๋“œ๋Š” ์Šคํฌ๋ฆฝํŠธ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์œ„์— ์ •์˜๋œ ๋กœ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•œ ํ›„ ๋กœ๊ทธ๋ฅผ Pub/Sub์— ์—ฐ๊ฒฐํ•˜๊ณ  ์ „์†กํ•ฉ๋‹ˆ๋‹ค. ์šฐ๋ฆฌ๊ฐ€ ํ•ด์•ผ ํ•  ์œ ์ผํ•œ ์ผ์€ ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๊ฒŒ์‹œ์žํด๋ผ์ด์–ธํŠธ, ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ฃผ์ œ์— ๋Œ€ํ•œ ๊ฒฝ๋กœ๋ฅผ ์ง€์ •ํ•˜์‹ญ์‹œ์˜ค. topic_path ๊ทธ๋ฆฌ๊ณ  ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์„ธ์š” publish ั topic_path ๊ทธ๋ฆฌ๊ณ  ๋ฐ์ดํ„ฐ. ์ˆ˜์ž…ํ•˜๊ณ  ์žˆ์œผ๋‹ˆ ์ฐธ๊ณ ํ•˜์„ธ์š” generate_log_line ์šฐ๋ฆฌ ์Šคํฌ๋ฆฝํŠธ์—์„œ stream_logs, ๋”ฐ๋ผ์„œ ์ด๋Ÿฌํ•œ ํŒŒ์ผ์ด ๋™์ผํ•œ ํด๋”์— ์žˆ๋Š”์ง€ ํ™•์ธํ•˜์‹ญ์‹œ์˜ค. ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด ๊ฐ€์ ธ์˜ค๊ธฐ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ ๋‹ค์Œ์„ ์‚ฌ์šฉํ•˜์—ฌ Google ์ฝ˜์†”์„ ํ†ตํ•ด ์ด๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

ํŒŒ์ผ์ด ์‹คํ–‰๋˜์ž๋งˆ์ž ์•„๋ž˜ ๊ทธ๋ฆผ๊ณผ ๊ฐ™์ด ๋กœ๊ทธ ๋ฐ์ดํ„ฐ๊ฐ€ ์ฝ˜์†”์— ์ถœ๋ ฅ๋˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ์Šคํฌ๋ฆฝํŠธ๋Š” ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” ํ•œ ์ž‘๋™ํ•ฉ๋‹ˆ๋‹ค. CTRL + C์™„๋ฃŒํ•ฉ๋‹ˆ๋‹ค.

์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. 2 ๋ถ€
๊ทธ๋ฆผ 4. ์ถœ๋ ฅ publish_logs.py

ํŒŒ์ดํ”„๋ผ์ธ ์ฝ”๋“œ ์ž‘์„ฑ

์ด์ œ ๋ชจ๋“  ๊ฒƒ์ด ์ค€๋น„๋˜์—ˆ์œผ๋ฏ€๋กœ Beam๊ณผ Python์„ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ฝ”๋”ฉํ•˜๋Š” ์žฌ๋ฏธ์žˆ๋Š” ๋ถ€๋ถ„์„ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. Beam ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•˜๋ ค๋ฉด ํŒŒ์ดํ”„๋ผ์ธ ๊ฐ์ฒด(p)๋ฅผ ์ƒ์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ํŒŒ์ดํ”„๋ผ์ธ ๊ฐœ์ฒด๋ฅผ ์ƒ์„ฑํ•œ ํ›„์—๋Š” ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์—ฌ๋Ÿฌ ๊ธฐ๋Šฅ์„ ์ฐจ๋ก€๋กœ ์ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. pipe (|). ์ผ๋ฐ˜์ ์œผ๋กœ ์›Œํฌํ”Œ๋กœ๋Š” ์•„๋ž˜ ์ด๋ฏธ์ง€์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

์šฐ๋ฆฌ ์ฝ”๋“œ์—์„œ๋Š” ๋‘ ๊ฐœ์˜ ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜๋ฅผ ์ƒ์„ฑํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๊ธฐ๋Šฅ regex_clean๋ฐ์ดํ„ฐ๋ฅผ ์Šค์บ”ํ•˜๊ณ  ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ PATTERNS ๋ชฉ๋ก์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•ด๋‹น ํ–‰์„ ๊ฒ€์ƒ‰ํ•ฉ๋‹ˆ๋‹ค. re.search. ์ด ํ•จ์ˆ˜๋Š” ์‰ผํ‘œ๋กœ ๊ตฌ๋ถ„๋œ ๋ฌธ์ž์—ด์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ์ •๊ทœ์‹ ์ „๋ฌธ๊ฐ€๊ฐ€ ์•„๋‹Œ ๊ฒฝ์šฐ ์ด ๋‚ด์šฉ์„ ํ™•์ธํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค. ์ง€๋„ ์‹œ๊ฐ„ ์ฝ”๋“œ๋ฅผ ํ™•์ธํ•˜๋ ค๋ฉด ๋ฉ”๋ชจ์žฅ์—์„œ ์—ฐ์Šตํ•˜์„ธ์š”. ๊ทธ ํ›„์—๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์‚ฌ์šฉ์ž ์ •์˜ ParDo ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค. ์Šคํ”Œ๋ฆฟ์ด๋Š” ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ๋น” ๋ณ€ํ™˜์˜ ๋ณ€ํ˜•์ž…๋‹ˆ๋‹ค. Python์—์„œ๋Š” ์ด ์ž‘์—…์ด ํŠน๋ณ„ํ•œ ๋ฐฉ์‹์œผ๋กœ ์ˆ˜ํ–‰๋ฉ๋‹ˆ๋‹ค. ์ฆ‰, DoFn Beam ํด๋ž˜์Šค์—์„œ ์ƒ์†๋˜๋Š” ํด๋ž˜์Šค๋ฅผ ๋งŒ๋“ค์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. Split ํ•จ์ˆ˜๋Š” ์ด์ „ ํ•จ์ˆ˜์—์„œ ํŒŒ์‹ฑ๋œ ํ–‰์„ ๊ฐ€์ ธ์™€ BigQuery ํ…Œ์ด๋ธ”์˜ ์—ด ์ด๋ฆ„์— ํ•ด๋‹นํ•˜๋Š” ํ‚ค๊ฐ€ ํฌํ•จ๋œ ์‚ฌ์ „ ๋ชฉ๋ก์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ์ด ํ•จ์ˆ˜์— ๋Œ€ํ•ด ์ฃผ์˜ํ•  ์ ์ด ์žˆ์Šต๋‹ˆ๋‹ค. datetime ํ•จ์ˆ˜ ๋‚ด๋ถ€์—์„œ ์ž‘๋™ํ•˜๋„๋ก ํ•ฉ๋‹ˆ๋‹ค. ํŒŒ์ผ ์‹œ์ž‘ ๋ถ€๋ถ„์—์„œ ๊ฐ€์ ธ์˜ค๊ธฐ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ๋Š”๋ฐ, ์ด์ƒํ–ˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ ์ด ๋ชฉ๋ก์€ ํ•จ์ˆ˜์— ์ „๋‹ฌ๋ฉ๋‹ˆ๋‹ค. WriteToBigQuery, ์ด๋Š” ๋‹จ์ˆœํžˆ ๋ฐ์ดํ„ฐ๋ฅผ ํ…Œ์ด๋ธ”์— ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค. Batch DataFlow Job ๋ฐ Streaming DataFlow Job์— ๋Œ€ํ•œ ์ฝ”๋“œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. ์ผ๊ด„ ์ฝ”๋“œ์™€ ์ŠคํŠธ๋ฆฌ๋ฐ ์ฝ”๋“œ์˜ ์œ ์ผํ•œ ์ฐจ์ด์ ์€ ์ผ๊ด„์ ์œผ๋กœ ๋‹ค์Œ์—์„œ CSV๋ฅผ ์ฝ๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. src_path๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•˜์—ฌ ReadFromText ๋น”์—์„œ.

์ผ๊ด„ DataFlow ์ž‘์—…(์ผ๊ด„ ์ฒ˜๋ฆฌ)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

์ŠคํŠธ๋ฆฌ๋ฐ DataFlow ์ž‘์—…(์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

์ปจ๋ฒ ์ด์–ด ์‹œ์ž‘

์—ฌ๋Ÿฌ ๊ฐ€์ง€ ๋ฐฉ๋ฒ•์œผ๋กœ ํŒŒ์ดํ”„๋ผ์ธ์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์›ํ•œ๋‹ค๋ฉด ์›๊ฒฉ์œผ๋กœ GCP์— ๋กœ๊ทธ์ธํ•˜๋ฉด์„œ ํ„ฐ๋ฏธ๋„์—์„œ ๋กœ์ปฌ๋กœ ์‹คํ–‰ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

๊ทธ๋Ÿฌ๋‚˜ ์šฐ๋ฆฌ๋Š” DataFlow๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด๋ฅผ ์‹คํ–‰ํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๋‹ค์Œ ํ•„์ˆ˜ ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ์„ค์ •ํ•˜์—ฌ ์•„๋ž˜ ๋ช…๋ น์„ ์‚ฌ์šฉํ•˜์—ฌ ์ด ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • project โ€” GCP ํ”„๋กœ์ ํŠธ์˜ ID์ž…๋‹ˆ๋‹ค.
  • runner ํ”„๋กœ๊ทธ๋žจ์„ ๋ถ„์„ํ•˜๊ณ  ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์„ฑํ•˜๋Š” ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰๊ธฐ์ž…๋‹ˆ๋‹ค. ํด๋ผ์šฐ๋“œ์—์„œ ์‹คํ–‰ํ•˜๋ ค๋ฉด DataflowRunner๋ฅผ ์ง€์ •ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
  • staging_location โ€” ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ํ”„๋กœ์„ธ์„œ์— ํ•„์š”ํ•œ ์ฝ”๋“œ ํŒจํ‚ค์ง€ ์ƒ‰์ธ์„ ์ƒ์„ฑํ•˜๊ธฐ ์œ„ํ•œ Cloud Dataflow ํด๋ผ์šฐ๋“œ ์ €์žฅ์†Œ์˜ ๊ฒฝ๋กœ์ž…๋‹ˆ๋‹ค.
  • temp_location โ€” ํŒŒ์ดํ”„๋ผ์ธ์ด ์‹คํ–‰๋˜๋Š” ๋™์•ˆ ์ƒ์„ฑ๋œ ์ž„์‹œ ์ž‘์—… ํŒŒ์ผ์„ ์ €์žฅํ•˜๊ธฐ ์œ„ํ•œ Cloud Dataflow ํด๋ผ์šฐ๋“œ ์ €์žฅ์†Œ์˜ ๊ฒฝ๋กœ์ž…๋‹ˆ๋‹ค.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

์ด ๋ช…๋ น์ด ์‹คํ–‰๋˜๋Š” ๋™์•ˆ Google ์ฝ˜์†”์˜ DataFlow ํƒญ์œผ๋กœ ์ด๋™ํ•˜์—ฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํŒŒ์ดํ”„๋ผ์ธ์„ ํด๋ฆญํ•˜๋ฉด ๊ทธ๋ฆผ 4์™€ ๋น„์Šทํ•œ ๋‚ด์šฉ์ด ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค. ๋””๋ฒ„๊น… ๋ชฉ์ ์œผ๋กœ ๋กœ๊ทธ๋กœ ์ด๋™ํ•œ ๋‹ค์Œ Stackdriver๋กœ ์ด๋™ํ•˜์—ฌ ์ž์„ธํ•œ ๋กœ๊ทธ๋ฅผ ๋ณด๋Š” ๊ฒƒ์ด ๋งค์šฐ ์œ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” ์—ฌ๋Ÿฌ ๊ฒฝ์šฐ์— ํŒŒ์ดํ”„๋ผ์ธ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๋Š” ๋ฐ ๋„์›€์ด ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. 2 ๋ถ€
๊ทธ๋ฆผ 4: ๋น” ์ปจ๋ฒ ์ด์–ด

BigQuery์—์„œ ๋ฐ์ดํ„ฐ์— ์•ก์„ธ์Šค

๋”ฐ๋ผ์„œ ํ…Œ์ด๋ธ”๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ํ๋ฅด๋Š” ํŒŒ์ดํ”„๋ผ์ธ์ด ์ด๋ฏธ ์‹คํ–‰๋˜๊ณ  ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ…Œ์ŠคํŠธํ•˜๋ ค๋ฉด BigQuery๋กœ ์ด๋™ํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ดํŽด๋ณด์„ธ์š”. ์•„๋ž˜ ๋ช…๋ น์„ ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ์ดํ„ฐ ์„ธํŠธ์˜ ์ฒ˜์Œ ๋ช‡ ํ–‰์ด ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค. ์ด์ œ BigQuery์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋˜์—ˆ์œผ๋ฏ€๋กœ ์ถ”๊ฐ€ ๋ถ„์„์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์„ ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ๋™๋ฃŒ์™€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ณต์œ ํ•˜๊ณ  ๋น„์ฆˆ๋‹ˆ์Šค ์งˆ๋ฌธ์— ๋Œ€ํ•œ ๋‹ต๋ณ€์„ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

์ŠคํŠธ๋ฆผ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. 2 ๋ถ€
๊ทธ๋ฆผ 5: BigQuery

๊ฒฐ๋ก 

์ด ๊ฒŒ์‹œ๋ฌผ์ด ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•˜๊ณ  ๋ฐ์ดํ„ฐ์— ๋” ์‰ฝ๊ฒŒ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์„ ์ฐพ๋Š” ์œ ์šฉํ•œ ์˜ˆ๊ฐ€ ๋˜๊ธฐ๋ฅผ ๋ฐ”๋ž๋‹ˆ๋‹ค. ์ด ํ˜•์‹์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๋ฉด ๋งŽ์€ ์ด์ ์„ ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด์ œ ์–ผ๋งˆ๋‚˜ ๋งŽ์€ ์‚ฌ๋žŒ๋“ค์ด ์šฐ๋ฆฌ ์ œํ’ˆ์„ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋Š”์ง€์™€ ๊ฐ™์€ ์ค‘์š”ํ•œ ์งˆ๋ฌธ์— ๋‹ตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์‹œ๊ฐ„์ด ์ง€๋‚จ์— ๋”ฐ๋ผ ์‚ฌ์šฉ์ž ๊ธฐ๋ฐ˜์ด ์ฆ๊ฐ€ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๊นŒ? ์‚ฌ๋žŒ๋“ค์ด ์ œํ’ˆ์˜ ์–ด๋–ค ์ธก๋ฉด๊ณผ ๊ฐ€์žฅ ๋งŽ์ด ์ƒํ˜ธ ์ž‘์šฉํ•ฉ๋‹ˆ๊นŒ? ๊ทธ๋ฆฌ๊ณ  ์žˆ์–ด์„œ๋Š” ์•ˆ ๋˜๋Š” ๊ณณ์— ์˜ค๋ฅ˜๊ฐ€ ์žˆ๋‚˜์š”? ์ด๋Š” ์กฐ์ง์ด ๊ด€์‹ฌ์„ ๊ฐ€์งˆ ์งˆ๋ฌธ์ž…๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ์งˆ๋ฌธ์— ๋Œ€ํ•œ ๋‹ต๋ณ€์—์„œ ์–ป์€ ํ†ต์ฐฐ๋ ฅ์„ ๋ฐ”ํƒ•์œผ๋กœ ์ œํ’ˆ์„ ๊ฐœ์„ ํ•˜๊ณ  ์‚ฌ์šฉ์ž ์ฐธ์—ฌ๋ฅผ ๋†’์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Beam์€ ์ด๋Ÿฌํ•œ ์œ ํ˜•์˜ ์šด๋™์— ์ •๋ง ์œ ์šฉํ•˜๋ฉฐ ๋‹ค๋ฅธ ํฅ๋ฏธ๋กœ์šด ์‚ฌ์šฉ ์‚ฌ๋ก€๋„ ๋งŽ์ด ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์ฃผ์‹ ํ‹ฑ ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋ถ„์„ํ•˜๊ณ  ๋ถ„์„์„ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ฑฐ๋ž˜๋ฅผ ํ•˜๊ณ  ์‹ถ์„ ์ˆ˜๋„ ์žˆ๊ณ , ์ฐจ๋Ÿ‰์—์„œ ์˜ค๋Š” ์„ผ์„œ ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๊ณ  ๊ตํ†ต ์ˆ˜์ค€ ๊ณ„์‚ฐ์„ ๊ณ„์‚ฐํ•˜๋ ค๊ณ  ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ณ  ์ด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ฃผ์š” ์ง€ํ‘œ๋ฅผ ์ถ”์ ํ•˜๋Š” ๋Œ€์‹œ๋ณด๋“œ๋ฅผ ๋งŒ๋“œ๋Š” ๊ฒŒ์ž„ ํšŒ์‚ฌ๊ฐ€ ๋  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ์ข‹์•„, ์—ฌ๋Ÿฌ๋ถ„, ์ด๊ฒƒ์€ ๋‹ค๋ฅธ ๊ฒŒ์‹œ๋ฌผ์˜ ์ฃผ์ œ์ž…๋‹ˆ๋‹ค. ์ฝ์–ด์ฃผ์…”์„œ ๊ฐ์‚ฌํ•ฉ๋‹ˆ๋‹ค. ์ „์ฒด ์ฝ”๋“œ๋ฅผ ๋ณด๊ณ  ์‹ถ์€ ๋ถ„๋“ค์„ ์œ„ํ•ด ์•„๋ž˜์— ๋‚ด GitHub ๋งํฌ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

https://github.com/DFoly/User_log_pipeline

๊ทธ๊ฒŒ ๋‹ค์•ผ. XNUMX๋ถ€ ์ฝ๊ธฐ.

์ถœ์ฒ˜ : habr.com

์ฝ”๋ฉ˜ํŠธ๋ฅผ ์ถ”๊ฐ€