ನಾವು ಸ್ಟ್ರೀಮ್ ಡೇಟಾ ಸಂಸ್ಕರಣಾ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರಚಿಸುತ್ತೇವೆ. ಭಾಗ 2

ಎಲ್ಲರಿಗು ನಮಸ್ಖರ. ನಾವು ಲೇಖನದ ಅಂತಿಮ ಭಾಗದ ಅನುವಾದವನ್ನು ಹಂಚಿಕೊಳ್ಳುತ್ತಿದ್ದೇವೆ, ನಿರ್ದಿಷ್ಟವಾಗಿ ಕೋರ್ಸ್‌ನ ವಿದ್ಯಾರ್ಥಿಗಳಿಗೆ ಸಿದ್ಧಪಡಿಸಲಾಗಿದೆ. ಡೇಟಾ ಇಂಜಿನಿಯರ್. ನೀವು ಮೊದಲ ಭಾಗವನ್ನು ಓದಬಹುದು ಇಲ್ಲಿ.

ರಿಯಲ್-ಟೈಮ್ ಪೈಪ್‌ಲೈನ್‌ಗಳಿಗಾಗಿ ಅಪಾಚೆ ಬೀಮ್ ಮತ್ತು ಡೇಟಾಫ್ಲೋ

ನಾವು ಸ್ಟ್ರೀಮ್ ಡೇಟಾ ಸಂಸ್ಕರಣಾ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರಚಿಸುತ್ತೇವೆ. ಭಾಗ 2

Google ಮೇಘವನ್ನು ಹೊಂದಿಸಲಾಗುತ್ತಿದೆ

ಗಮನಿಸಿ: ಪೈಥಾನ್ 3 ರಲ್ಲಿ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ಚಾಲನೆ ಮಾಡುವಲ್ಲಿ ನನಗೆ ತೊಂದರೆಯಿರುವ ಕಾರಣ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರನ್ ಮಾಡಲು ಮತ್ತು ಕಸ್ಟಮ್ ಲಾಗ್ ಡೇಟಾವನ್ನು ಪ್ರಕಟಿಸಲು ನಾನು Google ಕ್ಲೌಡ್ ಶೆಲ್ ಅನ್ನು ಬಳಸಿದ್ದೇನೆ. ಗೂಗಲ್ ಕ್ಲೌಡ್ ಶೆಲ್ ಪೈಥಾನ್ 2 ಅನ್ನು ಬಳಸುತ್ತದೆ, ಇದು ಅಪಾಚೆ ಬೀಮ್‌ನೊಂದಿಗೆ ಹೆಚ್ಚು ಸ್ಥಿರವಾಗಿರುತ್ತದೆ.

ಪೈಪ್ಲೈನ್ ​​ಅನ್ನು ಪ್ರಾರಂಭಿಸಲು, ನಾವು ಸೆಟ್ಟಿಂಗ್ಗಳಲ್ಲಿ ಸ್ವಲ್ಪ ಡಿಗ್ ಮಾಡಬೇಕಾಗಿದೆ. ನಿಮ್ಮಲ್ಲಿ ಮೊದಲು GCP ಅನ್ನು ಬಳಸದೆ ಇರುವವರಿಗೆ, ಇದರಲ್ಲಿ ವಿವರಿಸಿರುವ ಕೆಳಗಿನ 6 ಹಂತಗಳನ್ನು ನೀವು ಅನುಸರಿಸಬೇಕಾಗುತ್ತದೆ ಪುಟ.

ಇದರ ನಂತರ, ನಾವು ನಮ್ಮ ಸ್ಕ್ರಿಪ್ಟ್‌ಗಳನ್ನು Google ಮೇಘ ಸಂಗ್ರಹಣೆಗೆ ಅಪ್‌ಲೋಡ್ ಮಾಡಬೇಕಾಗುತ್ತದೆ ಮತ್ತು ಅವುಗಳನ್ನು ನಮ್ಮ Google ಕ್ಲೌಡ್ ಶೆಲ್‌ಗೆ ನಕಲಿಸಬೇಕಾಗುತ್ತದೆ. ಕ್ಲೌಡ್ ಸಂಗ್ರಹಣೆಗೆ ಅಪ್‌ಲೋಡ್ ಮಾಡುವುದು ತುಂಬಾ ಕ್ಷುಲ್ಲಕವಾಗಿದೆ (ವಿವರಣೆಯನ್ನು ಕಾಣಬಹುದು ಇಲ್ಲಿ) ನಮ್ಮ ಫೈಲ್‌ಗಳನ್ನು ನಕಲಿಸಲು, ಕೆಳಗಿನ ಚಿತ್ರ 2 ರಲ್ಲಿ ಎಡಭಾಗದಲ್ಲಿರುವ ಮೊದಲ ಐಕಾನ್ ಅನ್ನು ಕ್ಲಿಕ್ ಮಾಡುವ ಮೂಲಕ ನಾವು ಟೂಲ್‌ಬಾರ್‌ನಿಂದ Google ಕ್ಲೌಡ್ ಶೆಲ್ ಅನ್ನು ತೆರೆಯಬಹುದು.

ನಾವು ಸ್ಟ್ರೀಮ್ ಡೇಟಾ ಸಂಸ್ಕರಣಾ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರಚಿಸುತ್ತೇವೆ. ಭಾಗ 2
ಚಿತ್ರ 2

ನಾವು ಫೈಲ್‌ಗಳನ್ನು ನಕಲಿಸಲು ಮತ್ತು ಅಗತ್ಯವಿರುವ ಲೈಬ್ರರಿಗಳನ್ನು ಸ್ಥಾಪಿಸಲು ಅಗತ್ಯವಿರುವ ಆಜ್ಞೆಗಳನ್ನು ಕೆಳಗೆ ಪಟ್ಟಿ ಮಾಡಲಾಗಿದೆ.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

ನಮ್ಮ ಡೇಟಾಬೇಸ್ ಮತ್ತು ಟೇಬಲ್ ಅನ್ನು ರಚಿಸುವುದು

ಒಮ್ಮೆ ನಾವು ಎಲ್ಲಾ ಸೆಟಪ್ ಸಂಬಂಧಿತ ಹಂತಗಳನ್ನು ಪೂರ್ಣಗೊಳಿಸಿದ ನಂತರ, ನಾವು ಮಾಡಬೇಕಾದ ಮುಂದಿನ ಕೆಲಸವೆಂದರೆ BigQuery ನಲ್ಲಿ ಡೇಟಾಸೆಟ್ ಮತ್ತು ಟೇಬಲ್ ಅನ್ನು ರಚಿಸುವುದು. ಇದನ್ನು ಮಾಡಲು ಹಲವಾರು ಮಾರ್ಗಗಳಿವೆ, ಆದರೆ ಮೊದಲು ಡೇಟಾಸೆಟ್ ಅನ್ನು ರಚಿಸುವ ಮೂಲಕ Google ಕ್ಲೌಡ್ ಕನ್ಸೋಲ್ ಅನ್ನು ಬಳಸುವುದು ಸರಳವಾಗಿದೆ. ನೀವು ಕೆಳಗಿನ ಹಂತಗಳನ್ನು ಅನುಸರಿಸಬಹುದು ಲಿಂಕ್ಸ್ಕೀಮಾದೊಂದಿಗೆ ಟೇಬಲ್ ರಚಿಸಲು. ನಮ್ಮ ಟೇಬಲ್ ಹೊಂದಿರುತ್ತದೆ 7 ಕಾಲಮ್‌ಗಳು, ಪ್ರತಿ ಬಳಕೆದಾರ ಲಾಗ್‌ನ ಘಟಕಗಳಿಗೆ ಅನುರೂಪವಾಗಿದೆ. ಅನುಕೂಲಕ್ಕಾಗಿ, ನಾವು ಟೈಮ್‌ಲೋಕಲ್ ವೇರಿಯಬಲ್ ಹೊರತುಪಡಿಸಿ ಎಲ್ಲಾ ಕಾಲಮ್‌ಗಳನ್ನು ಸ್ಟ್ರಿಂಗ್‌ಗಳಾಗಿ ವ್ಯಾಖ್ಯಾನಿಸುತ್ತೇವೆ ಮತ್ತು ನಾವು ಮೊದಲು ರಚಿಸಿದ ವೇರಿಯಬಲ್‌ಗಳ ಪ್ರಕಾರ ಅವುಗಳನ್ನು ಹೆಸರಿಸುತ್ತೇವೆ. ನಮ್ಮ ಮೇಜಿನ ವಿನ್ಯಾಸವು ಚಿತ್ರ 3 ರಲ್ಲಿರುವಂತೆ ಇರಬೇಕು.

ನಾವು ಸ್ಟ್ರೀಮ್ ಡೇಟಾ ಸಂಸ್ಕರಣಾ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರಚಿಸುತ್ತೇವೆ. ಭಾಗ 2
ಚಿತ್ರ 3. ಟೇಬಲ್ ಲೇಔಟ್

ಬಳಕೆದಾರರ ಲಾಗ್ ಡೇಟಾವನ್ನು ಪ್ರಕಟಿಸಲಾಗುತ್ತಿದೆ

ಪಬ್/ಸಬ್ ನಮ್ಮ ಪೈಪ್‌ಲೈನ್‌ನ ನಿರ್ಣಾಯಕ ಅಂಶವಾಗಿದೆ ಏಕೆಂದರೆ ಇದು ಬಹು ಸ್ವತಂತ್ರ ಅಪ್ಲಿಕೇಶನ್‌ಗಳನ್ನು ಪರಸ್ಪರ ಸಂವಹನ ಮಾಡಲು ಅನುಮತಿಸುತ್ತದೆ. ನಿರ್ದಿಷ್ಟವಾಗಿ ಹೇಳುವುದಾದರೆ, ಅಪ್ಲಿಕೇಶನ್‌ಗಳ ನಡುವೆ ಸಂದೇಶಗಳನ್ನು ಕಳುಹಿಸಲು ಮತ್ತು ಸ್ವೀಕರಿಸಲು ನಮಗೆ ಅನುಮತಿಸುವ ಮಧ್ಯವರ್ತಿಯಾಗಿ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ. ನಾವು ಮಾಡಬೇಕಾದ ಮೊದಲ ವಿಷಯವೆಂದರೆ ವಿಷಯವನ್ನು ರಚಿಸುವುದು. ಕನ್ಸೋಲ್‌ನಲ್ಲಿ ಪಬ್/ಸಬ್‌ಗೆ ಹೋಗಿ ಮತ್ತು ವಿಷಯವನ್ನು ರಚಿಸಿ ಕ್ಲಿಕ್ ಮಾಡಿ.

ಕೆಳಗಿನ ಕೋಡ್ ಮೇಲೆ ವಿವರಿಸಿದ ಲಾಗ್ ಡೇಟಾವನ್ನು ರಚಿಸಲು ನಮ್ಮ ಸ್ಕ್ರಿಪ್ಟ್ ಅನ್ನು ಕರೆಯುತ್ತದೆ ಮತ್ತು ನಂತರ ಲಾಗ್‌ಗಳನ್ನು ಪಬ್/ಸಬ್‌ಗೆ ಸಂಪರ್ಕಿಸುತ್ತದೆ ಮತ್ತು ಕಳುಹಿಸುತ್ತದೆ. ನಾವು ಮಾಡಬೇಕಾದ ಏಕೈಕ ವಿಷಯವೆಂದರೆ ವಸ್ತುವನ್ನು ರಚಿಸುವುದು ಪ್ರಕಾಶಕ ಗ್ರಾಹಕ, ವಿಧಾನವನ್ನು ಬಳಸಿಕೊಂಡು ವಿಷಯದ ಮಾರ್ಗವನ್ನು ಸೂಚಿಸಿ topic_path ಮತ್ತು ಕಾರ್ಯವನ್ನು ಕರೆ ಮಾಡಿ publish с topic_path ಮತ್ತು ಡೇಟಾ. ನಾವು ಆಮದು ಮಾಡಿಕೊಳ್ಳುತ್ತೇವೆ ಎಂಬುದನ್ನು ದಯವಿಟ್ಟು ಗಮನಿಸಿ generate_log_line ನಮ್ಮ ಲಿಪಿಯಿಂದ stream_logs, ಆದ್ದರಿಂದ ಈ ಫೈಲ್‌ಗಳು ಒಂದೇ ಫೋಲ್ಡರ್‌ನಲ್ಲಿವೆ ಎಂದು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳಿ, ಇಲ್ಲದಿದ್ದರೆ ನೀವು ಆಮದು ದೋಷವನ್ನು ಪಡೆಯುತ್ತೀರಿ. ನಂತರ ನಾವು ಇದನ್ನು ನಮ್ಮ Google ಕನ್ಸೋಲ್ ಮೂಲಕ ಚಲಾಯಿಸಬಹುದು:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

ಫೈಲ್ ರನ್ ಆದ ತಕ್ಷಣ, ಕೆಳಗಿನ ಚಿತ್ರದಲ್ಲಿ ತೋರಿಸಿರುವಂತೆ ಕನ್ಸೋಲ್‌ಗೆ ಲಾಗ್ ಡೇಟಾದ ಔಟ್‌ಪುಟ್ ಅನ್ನು ನೋಡಲು ನಮಗೆ ಸಾಧ್ಯವಾಗುತ್ತದೆ. ನಾವು ಬಳಸದಿರುವವರೆಗೆ ಈ ಸ್ಕ್ರಿಪ್ಟ್ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ CTRL + Cಅದನ್ನು ಪೂರ್ಣಗೊಳಿಸಲು.

ನಾವು ಸ್ಟ್ರೀಮ್ ಡೇಟಾ ಸಂಸ್ಕರಣಾ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರಚಿಸುತ್ತೇವೆ. ಭಾಗ 2
ಚಿತ್ರ 4. ಔಟ್ಪುಟ್ publish_logs.py

ನಮ್ಮ ಪೈಪ್ಲೈನ್ ​​ಕೋಡ್ ಬರೆಯುವುದು

ಈಗ ನಾವು ಎಲ್ಲವನ್ನೂ ಸಿದ್ಧಪಡಿಸಿದ್ದೇವೆ, ನಾವು ಮೋಜಿನ ಭಾಗವನ್ನು ಪ್ರಾರಂಭಿಸಬಹುದು - ಬೀಮ್ ಮತ್ತು ಪೈಥಾನ್ ಬಳಸಿ ನಮ್ಮ ಪೈಪ್‌ಲೈನ್ ಕೋಡಿಂಗ್. ಬೀಮ್ ಪೈಪ್‌ಲೈನ್ ರಚಿಸಲು, ನಾವು ಪೈಪ್‌ಲೈನ್ ವಸ್ತುವನ್ನು (p) ರಚಿಸಬೇಕಾಗಿದೆ. ಒಮ್ಮೆ ನಾವು ಪೈಪ್‌ಲೈನ್ ಆಬ್ಜೆಕ್ಟ್ ಅನ್ನು ರಚಿಸಿದ ನಂತರ, ಆಪರೇಟರ್ ಅನ್ನು ಬಳಸಿಕೊಂಡು ನಾವು ಒಂದರ ನಂತರ ಒಂದರಂತೆ ಅನೇಕ ಕಾರ್ಯಗಳನ್ನು ಅನ್ವಯಿಸಬಹುದು pipe (|). ಸಾಮಾನ್ಯವಾಗಿ, ಕೆಲಸದ ಹರಿವು ಕೆಳಗಿನ ಚಿತ್ರದಂತೆ ಕಾಣುತ್ತದೆ.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

ನಮ್ಮ ಕೋಡ್‌ನಲ್ಲಿ, ನಾವು ಎರಡು ಕಸ್ಟಮ್ ಕಾರ್ಯಗಳನ್ನು ರಚಿಸುತ್ತೇವೆ. ಕಾರ್ಯ regex_clean, ಇದು ಡೇಟಾವನ್ನು ಸ್ಕ್ಯಾನ್ ಮಾಡುತ್ತದೆ ಮತ್ತು ಕಾರ್ಯವನ್ನು ಬಳಸಿಕೊಂಡು ಪ್ಯಾಟರ್ನ್ಸ್ ಪಟ್ಟಿಯನ್ನು ಆಧರಿಸಿ ಅನುಗುಣವಾದ ಸಾಲನ್ನು ಹಿಂಪಡೆಯುತ್ತದೆ re.search. ಕಾರ್ಯವು ಅಲ್ಪವಿರಾಮದಿಂದ ಬೇರ್ಪಟ್ಟ ಸ್ಟ್ರಿಂಗ್ ಅನ್ನು ಹಿಂತಿರುಗಿಸುತ್ತದೆ. ನೀವು ನಿಯಮಿತ ಅಭಿವ್ಯಕ್ತಿ ತಜ್ಞರಲ್ಲದಿದ್ದರೆ, ಇದನ್ನು ಪರಿಶೀಲಿಸಲು ನಾನು ಶಿಫಾರಸು ಮಾಡುತ್ತೇವೆ ಟ್ಯುಟೋರಿಯಲ್ ಮತ್ತು ಕೋಡ್ ಅನ್ನು ಪರಿಶೀಲಿಸಲು ನೋಟ್‌ಪ್ಯಾಡ್‌ನಲ್ಲಿ ಅಭ್ಯಾಸ ಮಾಡಿ. ಇದರ ನಂತರ ನಾವು ಕಸ್ಟಮ್ ParDo ಕಾರ್ಯವನ್ನು ವ್ಯಾಖ್ಯಾನಿಸುತ್ತೇವೆ ಒಡೆದ, ಇದು ಸಮಾನಾಂತರ ಪ್ರಕ್ರಿಯೆಗಾಗಿ ಬೀಮ್ ರೂಪಾಂತರದ ಬದಲಾವಣೆಯಾಗಿದೆ. ಪೈಥಾನ್‌ನಲ್ಲಿ, ಇದನ್ನು ವಿಶೇಷ ರೀತಿಯಲ್ಲಿ ಮಾಡಲಾಗುತ್ತದೆ - ನಾವು DoFn ಬೀಮ್ ವರ್ಗದಿಂದ ಆನುವಂಶಿಕವಾಗಿ ಪಡೆಯುವ ವರ್ಗವನ್ನು ರಚಿಸಬೇಕು. ಸ್ಪ್ಲಿಟ್ ಫಂಕ್ಷನ್ ಹಿಂದಿನ ಫಂಕ್ಷನ್‌ನಿಂದ ಪಾರ್ಸ್ ಮಾಡಿದ ಸಾಲನ್ನು ತೆಗೆದುಕೊಳ್ಳುತ್ತದೆ ಮತ್ತು ನಮ್ಮ BigQuery ಟೇಬಲ್‌ನಲ್ಲಿನ ಕಾಲಮ್ ಹೆಸರುಗಳಿಗೆ ಅನುಗುಣವಾದ ಕೀಗಳನ್ನು ಹೊಂದಿರುವ ನಿಘಂಟುಗಳ ಪಟ್ಟಿಯನ್ನು ಹಿಂತಿರುಗಿಸುತ್ತದೆ. ಈ ಕಾರ್ಯದ ಬಗ್ಗೆ ಗಮನಿಸಬೇಕಾದ ಅಂಶವಿದೆ: ನಾನು ಆಮದು ಮಾಡಿಕೊಳ್ಳಬೇಕಾಗಿತ್ತು datetime ಇದು ಕೆಲಸ ಮಾಡಲು ಒಂದು ಕಾರ್ಯದ ಒಳಗೆ. ಫೈಲ್‌ನ ಪ್ರಾರಂಭದಲ್ಲಿ ನಾನು ಆಮದು ದೋಷವನ್ನು ಪಡೆಯುತ್ತಿದ್ದೆ, ಅದು ವಿಚಿತ್ರವಾಗಿದೆ. ನಂತರ ಈ ಪಟ್ಟಿಯನ್ನು ಕಾರ್ಯಕ್ಕೆ ರವಾನಿಸಲಾಗುತ್ತದೆ WriteToBigQuery, ಇದು ಸರಳವಾಗಿ ನಮ್ಮ ಡೇಟಾವನ್ನು ಟೇಬಲ್‌ಗೆ ಸೇರಿಸುತ್ತದೆ. ಬ್ಯಾಚ್ ಡೇಟಾಫ್ಲೋ ಜಾಬ್ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾಫ್ಲೋ ಜಾಬ್‌ಗಾಗಿ ಕೋಡ್ ಅನ್ನು ಕೆಳಗೆ ನೀಡಲಾಗಿದೆ. ಬ್ಯಾಚ್ ಮತ್ತು ಸ್ಟ್ರೀಮಿಂಗ್ ಕೋಡ್ ನಡುವಿನ ವ್ಯತ್ಯಾಸವೆಂದರೆ ಬ್ಯಾಚ್‌ನಲ್ಲಿ ನಾವು CSV ಅನ್ನು ಓದುತ್ತೇವೆ src_pathಕಾರ್ಯವನ್ನು ಬಳಸುವುದು ReadFromText ಕಿರಣದಿಂದ.

ಬ್ಯಾಚ್ ಡೇಟಾಫ್ಲೋ ಜಾಬ್ (ಬ್ಯಾಚ್ ಪ್ರಕ್ರಿಯೆ)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾಫ್ಲೋ ಜಾಬ್ (ಸ್ಟ್ರೀಮ್ ಪ್ರಕ್ರಿಯೆ)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

ಕನ್ವೇಯರ್ ಅನ್ನು ಪ್ರಾರಂಭಿಸಲಾಗುತ್ತಿದೆ

ನಾವು ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ವಿವಿಧ ರೀತಿಯಲ್ಲಿ ಚಲಾಯಿಸಬಹುದು. ನಾವು ಬಯಸಿದರೆ, ರಿಮೋಟ್ ಆಗಿ GCP ಗೆ ಲಾಗ್ ಇನ್ ಮಾಡುವಾಗ ನಾವು ಅದನ್ನು ಟರ್ಮಿನಲ್‌ನಿಂದ ಸ್ಥಳೀಯವಾಗಿ ಚಲಾಯಿಸಬಹುದು.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

ಆದಾಗ್ಯೂ, ನಾವು ಅದನ್ನು DataFlow ಬಳಸಿ ಚಲಾಯಿಸಲಿದ್ದೇವೆ. ಕೆಳಗಿನ ಅಗತ್ಯವಿರುವ ನಿಯತಾಂಕಗಳನ್ನು ಹೊಂದಿಸುವ ಮೂಲಕ ಕೆಳಗಿನ ಆಜ್ಞೆಯನ್ನು ಬಳಸಿಕೊಂಡು ನಾವು ಇದನ್ನು ಮಾಡಬಹುದು.

  • project — ನಿಮ್ಮ GCP ಯೋಜನೆಯ ID.
  • runner ಪೈಪ್‌ಲೈನ್ ರನ್ನರ್ ಆಗಿದ್ದು ಅದು ನಿಮ್ಮ ಪ್ರೋಗ್ರಾಂ ಅನ್ನು ವಿಶ್ಲೇಷಿಸುತ್ತದೆ ಮತ್ತು ನಿಮ್ಮ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ನಿರ್ಮಿಸುತ್ತದೆ. ಕ್ಲೌಡ್‌ನಲ್ಲಿ ರನ್ ಮಾಡಲು, ನೀವು DataflowRunner ಅನ್ನು ನಿರ್ದಿಷ್ಟಪಡಿಸಬೇಕು.
  • staging_location — ಕೆಲಸವನ್ನು ನಿರ್ವಹಿಸುವ ಪ್ರೊಸೆಸರ್‌ಗಳಿಗೆ ಅಗತ್ಯವಿರುವ ಕೋಡ್ ಪ್ಯಾಕೇಜ್‌ಗಳನ್ನು ಸೂಚಿಕೆ ಮಾಡಲು ಕ್ಲೌಡ್ ಡೇಟಾಫ್ಲೋ ಕ್ಲೌಡ್ ಸಂಗ್ರಹಣೆಯ ಮಾರ್ಗ.
  • temp_location — ಪೈಪ್‌ಲೈನ್ ಚಾಲನೆಯಲ್ಲಿರುವಾಗ ರಚಿಸಲಾದ ತಾತ್ಕಾಲಿಕ ಉದ್ಯೋಗ ಫೈಲ್‌ಗಳನ್ನು ಸಂಗ್ರಹಿಸಲು ಕ್ಲೌಡ್ ಡೇಟಾಫ್ಲೋ ಕ್ಲೌಡ್ ಸಂಗ್ರಹಣೆಗೆ ಮಾರ್ಗ.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

ಈ ಆಜ್ಞೆಯು ಚಾಲನೆಯಲ್ಲಿರುವಾಗ, ನಾವು google console ನಲ್ಲಿ DataFlow ಟ್ಯಾಬ್‌ಗೆ ಹೋಗಬಹುದು ಮತ್ತು ನಮ್ಮ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ವೀಕ್ಷಿಸಬಹುದು. ನಾವು ಪೈಪ್‌ಲೈನ್‌ನಲ್ಲಿ ಕ್ಲಿಕ್ ಮಾಡಿದಾಗ, ಚಿತ್ರ 4 ರಂತೆಯೇ ನಾವು ಏನನ್ನಾದರೂ ನೋಡಬೇಕು. ಡೀಬಗ್ ಮಾಡುವ ಉದ್ದೇಶಗಳಿಗಾಗಿ, ವಿವರವಾದ ಲಾಗ್‌ಗಳನ್ನು ವೀಕ್ಷಿಸಲು ಲಾಗ್‌ಗಳಿಗೆ ಮತ್ತು ನಂತರ ಸ್ಟಾಕ್‌ಡ್ರೈವರ್‌ಗೆ ಹೋಗಲು ಇದು ತುಂಬಾ ಸಹಾಯಕವಾಗಬಹುದು. ಇದು ಹಲವಾರು ಸಂದರ್ಭಗಳಲ್ಲಿ ಪೈಪ್‌ಲೈನ್ ಸಮಸ್ಯೆಗಳನ್ನು ಪರಿಹರಿಸಲು ನನಗೆ ಸಹಾಯ ಮಾಡಿದೆ.

ನಾವು ಸ್ಟ್ರೀಮ್ ಡೇಟಾ ಸಂಸ್ಕರಣಾ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರಚಿಸುತ್ತೇವೆ. ಭಾಗ 2
ಚಿತ್ರ 4: ಬೀಮ್ ಕನ್ವೇಯರ್

BigQuery ನಲ್ಲಿ ನಮ್ಮ ಡೇಟಾವನ್ನು ಪ್ರವೇಶಿಸಿ

ಆದ್ದರಿಂದ, ನಾವು ಈಗಾಗಲೇ ನಮ್ಮ ಟೇಬಲ್‌ಗೆ ಡೇಟಾ ಹರಿಯುವ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ಹೊಂದಿರಬೇಕು. ಇದನ್ನು ಪರೀಕ್ಷಿಸಲು, ನಾವು BigQuery ಗೆ ಹೋಗಬಹುದು ಮತ್ತು ಡೇಟಾವನ್ನು ನೋಡಬಹುದು. ಕೆಳಗಿನ ಆಜ್ಞೆಯನ್ನು ಬಳಸಿದ ನಂತರ ನೀವು ಡೇಟಾಸೆಟ್‌ನ ಮೊದಲ ಕೆಲವು ಸಾಲುಗಳನ್ನು ನೋಡಬೇಕು. ಈಗ ನಾವು BigQuery ನಲ್ಲಿ ಡೇಟಾವನ್ನು ಸಂಗ್ರಹಿಸಿದ್ದೇವೆ, ನಾವು ಹೆಚ್ಚಿನ ವಿಶ್ಲೇಷಣೆಯನ್ನು ನಡೆಸಬಹುದು, ಹಾಗೆಯೇ ಡೇಟಾವನ್ನು ಸಹೋದ್ಯೋಗಿಗಳೊಂದಿಗೆ ಹಂಚಿಕೊಳ್ಳಬಹುದು ಮತ್ತು ವ್ಯಾಪಾರ ಪ್ರಶ್ನೆಗಳಿಗೆ ಉತ್ತರಿಸಲು ಪ್ರಾರಂಭಿಸಬಹುದು.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

ನಾವು ಸ್ಟ್ರೀಮ್ ಡೇಟಾ ಸಂಸ್ಕರಣಾ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರಚಿಸುತ್ತೇವೆ. ಭಾಗ 2
ಚಿತ್ರ 5: BigQuery

ತೀರ್ಮಾನಕ್ಕೆ

ಈ ಪೋಸ್ಟ್ ಸ್ಟ್ರೀಮಿಂಗ್ ಡೇಟಾ ಪೈಪ್‌ಲೈನ್ ಅನ್ನು ರಚಿಸುವ ಉಪಯುಕ್ತ ಉದಾಹರಣೆಯಾಗಿ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತದೆ ಎಂದು ನಾವು ಭಾವಿಸುತ್ತೇವೆ, ಜೊತೆಗೆ ಡೇಟಾವನ್ನು ಹೆಚ್ಚು ಪ್ರವೇಶಿಸಲು ಮಾರ್ಗಗಳನ್ನು ಹುಡುಕುತ್ತೇವೆ. ಈ ಸ್ವರೂಪದಲ್ಲಿ ಡೇಟಾವನ್ನು ಸಂಗ್ರಹಿಸುವುದು ನಮಗೆ ಅನೇಕ ಪ್ರಯೋಜನಗಳನ್ನು ನೀಡುತ್ತದೆ. ನಮ್ಮ ಉತ್ಪನ್ನವನ್ನು ಎಷ್ಟು ಜನರು ಬಳಸುತ್ತಾರೆ ಎಂಬಂತಹ ಪ್ರಮುಖ ಪ್ರಶ್ನೆಗಳಿಗೆ ನಾವು ಈಗ ಉತ್ತರಿಸಲು ಪ್ರಾರಂಭಿಸಬಹುದು? ನಿಮ್ಮ ಬಳಕೆದಾರ ನೆಲೆಯು ಕಾಲಾನಂತರದಲ್ಲಿ ಬೆಳೆಯುತ್ತಿದೆಯೇ? ಉತ್ಪನ್ನದ ಯಾವ ಅಂಶಗಳೊಂದಿಗೆ ಜನರು ಹೆಚ್ಚು ಸಂವಹನ ನಡೆಸುತ್ತಾರೆ? ಮತ್ತು ಇರಬಾರದ ಸ್ಥಳದಲ್ಲಿ ದೋಷಗಳಿವೆಯೇ? ಈ ಪ್ರಶ್ನೆಗಳು ಸಂಸ್ಥೆಗೆ ಆಸಕ್ತಿಯನ್ನುಂಟುಮಾಡುತ್ತವೆ. ಈ ಪ್ರಶ್ನೆಗಳಿಗೆ ಉತ್ತರಗಳಿಂದ ಹೊರಹೊಮ್ಮುವ ಒಳನೋಟಗಳ ಆಧಾರದ ಮೇಲೆ, ನಾವು ಉತ್ಪನ್ನವನ್ನು ಸುಧಾರಿಸಬಹುದು ಮತ್ತು ಬಳಕೆದಾರರ ತೊಡಗಿಸಿಕೊಳ್ಳುವಿಕೆಯನ್ನು ಹೆಚ್ಚಿಸಬಹುದು.

ಬೀಮ್ ಈ ರೀತಿಯ ವ್ಯಾಯಾಮಕ್ಕೆ ನಿಜವಾಗಿಯೂ ಉಪಯುಕ್ತವಾಗಿದೆ ಮತ್ತು ಹಲವಾರು ಇತರ ಆಸಕ್ತಿದಾಯಕ ಬಳಕೆಯ ಸಂದರ್ಭಗಳನ್ನು ಹೊಂದಿದೆ. ಉದಾಹರಣೆಗೆ, ನೀವು ನೈಜ ಸಮಯದಲ್ಲಿ ಸ್ಟಾಕ್ ಟಿಕ್ ಡೇಟಾವನ್ನು ವಿಶ್ಲೇಷಿಸಲು ಮತ್ತು ವಿಶ್ಲೇಷಣೆಯ ಆಧಾರದ ಮೇಲೆ ವಹಿವಾಟುಗಳನ್ನು ಮಾಡಲು ಬಯಸಬಹುದು, ಬಹುಶಃ ನೀವು ವಾಹನಗಳಿಂದ ಬರುವ ಸಂವೇದಕ ಡೇಟಾವನ್ನು ಹೊಂದಿದ್ದೀರಿ ಮತ್ತು ಟ್ರಾಫಿಕ್ ಮಟ್ಟದ ಲೆಕ್ಕಾಚಾರಗಳನ್ನು ಲೆಕ್ಕಾಚಾರ ಮಾಡಲು ಬಯಸುತ್ತೀರಿ. ಉದಾಹರಣೆಗೆ, ನೀವು ಬಳಕೆದಾರರ ಡೇಟಾವನ್ನು ಸಂಗ್ರಹಿಸುವ ಮತ್ತು ಪ್ರಮುಖ ಮೆಟ್ರಿಕ್‌ಗಳನ್ನು ಟ್ರ್ಯಾಕ್ ಮಾಡಲು ಡ್ಯಾಶ್‌ಬೋರ್ಡ್‌ಗಳನ್ನು ರಚಿಸಲು ಅದನ್ನು ಬಳಸುವ ಗೇಮಿಂಗ್ ಕಂಪನಿಯಾಗಿರಬಹುದು. ಸರಿ, ಮಹನೀಯರೇ, ಇದು ಮತ್ತೊಂದು ಪೋಸ್ಟ್‌ಗೆ ವಿಷಯವಾಗಿದೆ, ಓದಿದ್ದಕ್ಕಾಗಿ ಧನ್ಯವಾದಗಳು, ಮತ್ತು ಸಂಪೂರ್ಣ ಕೋಡ್ ಅನ್ನು ನೋಡಲು ಬಯಸುವವರಿಗೆ, ನನ್ನ GitHub ಗೆ ಲಿಂಕ್ ಕೆಳಗೆ ಇದೆ.

https://github.com/DFoly/User_log_pipeline

ಅದು ಅಷ್ಟೆ. ಭಾಗ ಒಂದನ್ನು ಓದಿ.

ಮೂಲ: www.habr.com

ಕಾಮೆಂಟ್ ಅನ್ನು ಸೇರಿಸಿ