Δημιουργούμε έναν αγωγό επεξεργασίας δεδομένων ροής. Μέρος 2ο

Γεια σε όλους. Μοιραζόμαστε τη μετάφραση του τελευταίου μέρους του άρθρου, που ετοιμάστηκε ειδικά για φοιτητές του μαθήματος. Μηχανικός Δεδομένων. Μπορείτε να διαβάσετε το πρώτο μέρος εδώ.

Apache Beam και DataFlow για αγωγούς σε πραγματικό χρόνο

Δημιουργούμε έναν αγωγό επεξεργασίας δεδομένων ροής. Μέρος 2ο

Ρύθμιση του Google Cloud

Σημείωση: Χρησιμοποίησα το Google Cloud Shell για να εκτελέσω τη διοχέτευση και να δημοσιεύσω προσαρμοσμένα δεδομένα καταγραφής επειδή αντιμετώπιζα πρόβλημα με τη λειτουργία της διοχέτευσης στην Python 3. Το Google Cloud Shell χρησιμοποιεί Python 2, το οποίο είναι πιο συνεπές με το Apache Beam.

Για να ξεκινήσετε τον αγωγό, πρέπει να ψάξουμε λίγο στις ρυθμίσεις. Για όσους από εσάς δεν έχετε χρησιμοποιήσει το GCP στο παρελθόν, θα χρειαστεί να ακολουθήσετε τα ακόλουθα 6 βήματα που περιγράφονται σε αυτό σελίδα.

Μετά από αυτό, θα χρειαστεί να ανεβάσουμε τα σενάρια μας στο Google Cloud Storage και να τα αντιγράψουμε στο Google Cloud Shel μας. Η μεταφόρτωση στο χώρο αποθήκευσης cloud είναι αρκετά ασήμαντη (μπορεί να βρεθεί μια περιγραφή εδώ). Για να αντιγράψουμε τα αρχεία μας, μπορούμε να ανοίξουμε το Google Cloud Shel από τη γραμμή εργαλείων κάνοντας κλικ στο πρώτο εικονίδιο στα αριστερά στην Εικόνα 2 παρακάτω.

Δημιουργούμε έναν αγωγό επεξεργασίας δεδομένων ροής. Μέρος 2ο
Σχήμα 2

Οι εντολές που χρειαζόμαστε για να αντιγράψουμε τα αρχεία και να εγκαταστήσουμε τις απαιτούμενες βιβλιοθήκες παρατίθενται παρακάτω.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Δημιουργία της βάσης δεδομένων και του πίνακα μας

Αφού ολοκληρώσουμε όλα τα βήματα που σχετίζονται με τη ρύθμιση, το επόμενο πράγμα που πρέπει να κάνουμε είναι να δημιουργήσουμε ένα σύνολο δεδομένων και πίνακα στο BigQuery. Υπάρχουν διάφοροι τρόποι για να το κάνετε αυτό, αλλά ο πιο απλός είναι να χρησιμοποιήσετε την κονσόλα Google Cloud δημιουργώντας πρώτα ένα σύνολο δεδομένων. Μπορείτε να ακολουθήσετε τα παρακάτω βήματα σύνδεσμοςγια να δημιουργήσετε έναν πίνακα με ένα σχήμα. Το τραπέζι μας θα έχει 7 στήλες, που αντιστοιχεί στα στοιχεία κάθε αρχείου καταγραφής χρηστών. Για ευκολία, θα ορίσουμε όλες τις στήλες ως συμβολοσειρές, εκτός από τη μεταβλητή timelocal, και θα τις ονομάσουμε σύμφωνα με τις μεταβλητές που δημιουργήσαμε νωρίτερα. Η διάταξη του πίνακα μας θα πρέπει να μοιάζει με την εικόνα 3.

Δημιουργούμε έναν αγωγό επεξεργασίας δεδομένων ροής. Μέρος 2ο
Εικόνα 3. Διάταξη πίνακα

Δημοσίευση δεδομένων καταγραφής χρηστών

Το Pub/Sub είναι ένα κρίσιμο στοιχείο του αγωγού μας επειδή επιτρέπει σε πολλαπλές ανεξάρτητες εφαρμογές να επικοινωνούν μεταξύ τους. Συγκεκριμένα, λειτουργεί ως ενδιάμεσος που μας επιτρέπει να στέλνουμε και να λαμβάνουμε μηνύματα μεταξύ των εφαρμογών. Το πρώτο πράγμα που πρέπει να κάνουμε είναι να δημιουργήσουμε ένα θέμα. Απλώς μεταβείτε στο Pub/Sub στην κονσόλα και κάντε κλικ στο CREATE TOPIC.

Ο παρακάτω κώδικας καλεί το σενάριό μας για να δημιουργήσει τα δεδομένα καταγραφής που ορίζονται παραπάνω και στη συνέχεια συνδέει και στέλνει τα αρχεία καταγραφής στο Pub/Sub. Το μόνο που χρειάζεται να κάνουμε είναι να δημιουργήσουμε ένα αντικείμενο PublisherClient, καθορίστε τη διαδρομή προς το θέμα χρησιμοποιώντας τη μέθοδο topic_path και καλέστε τη συνάρτηση publish с topic_path και δεδομένα. Σημειώστε ότι εισάγουμε generate_log_line από το σενάριο μας stream_logs, επομένως βεβαιωθείτε ότι αυτά τα αρχεία βρίσκονται στον ίδιο φάκελο, διαφορετικά θα εμφανιστεί σφάλμα εισαγωγής. Στη συνέχεια, μπορούμε να το εκτελέσουμε μέσω της κονσόλας Google χρησιμοποιώντας:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Μόλις εκτελεστεί το αρχείο, θα μπορούμε να δούμε την έξοδο των δεδομένων καταγραφής στην κονσόλα, όπως φαίνεται στην παρακάτω εικόνα. Αυτό το σενάριο θα λειτουργεί όσο δεν το χρησιμοποιούμε CTRL + Cγια να το ολοκληρώσετε.

Δημιουργούμε έναν αγωγό επεξεργασίας δεδομένων ροής. Μέρος 2ο
Εικόνα 4. Έξοδος publish_logs.py

Γράφοντας τον κώδικα του αγωγού μας

Τώρα που έχουμε τα πάντα έτοιμα, μπορούμε να ξεκινήσουμε το διασκεδαστικό μέρος - κωδικοποίηση του αγωγού μας χρησιμοποιώντας Beam και Python. Για να δημιουργήσουμε μια σωλήνωση Beam, πρέπει να δημιουργήσουμε ένα αντικείμενο pipeline (p). Αφού δημιουργήσουμε ένα αντικείμενο pipeline, μπορούμε να εφαρμόσουμε πολλαπλές συναρτήσεις η μία μετά την άλλη χρησιμοποιώντας τον τελεστή pipe (|). Σε γενικές γραμμές, η ροή εργασίας μοιάζει με την παρακάτω εικόνα.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Στον κώδικά μας, θα δημιουργήσουμε δύο προσαρμοσμένες συναρτήσεις. Λειτουργία regex_clean, το οποίο σαρώνει τα δεδομένα και ανακτά την αντίστοιχη σειρά με βάση τη λίστα PATTERNS χρησιμοποιώντας τη συνάρτηση re.search. Η συνάρτηση επιστρέφει μια συμβολοσειρά διαχωρισμένη με κόμμα. Εάν δεν είστε ειδικός στην κανονική έκφραση, σας συνιστώ να το ελέγξετε φροντιστήριο και εξασκηθείτε σε ένα σημειωματάριο για να ελέγξετε τον κώδικα. Μετά από αυτό ορίζουμε μια προσαρμοσμένη συνάρτηση ParDo που ονομάζεται Σπλιτ, που είναι μια παραλλαγή του μετασχηματισμού Beam για παράλληλη επεξεργασία. Στην Python, αυτό γίνεται με έναν ειδικό τρόπο - πρέπει να δημιουργήσουμε μια κλάση που κληρονομεί από την κλάση DoFn Beam. Η συνάρτηση Split παίρνει την αναλυμένη σειρά από την προηγούμενη συνάρτηση και επιστρέφει μια λίστα λεξικών με κλειδιά που αντιστοιχούν στα ονόματα των στηλών στον πίνακα BigQuery. Υπάρχει κάτι που πρέπει να σημειωθεί σχετικά με αυτήν τη λειτουργία: έπρεπε να κάνω εισαγωγή datetime μέσα σε μια συνάρτηση για να λειτουργήσει. Έβλεπα ένα σφάλμα εισαγωγής στην αρχή του αρχείου, το οποίο ήταν περίεργο. Αυτή η λίστα μεταβιβάζεται στη συνέχεια στη συνάρτηση WriteToBigQuery, το οποίο απλώς προσθέτει τα δεδομένα μας στον πίνακα. Ο κωδικός για εργασία Batch DataFlow Job και Streaming DataFlow Job δίνεται παρακάτω. Η μόνη διαφορά μεταξύ κώδικα δέσμης και ροής είναι ότι κατά παρτίδες διαβάζουμε το CSV από src_pathχρησιμοποιώντας τη συνάρτηση ReadFromText από το Beam.

Εργασία ροής δεδομένων παρτίδας (επεξεργασία παρτίδας)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Εργασία ροής δεδομένων ροής (επεξεργασία ροής)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Εκκίνηση του μεταφορέα

Μπορούμε να τρέξουμε τον αγωγό με πολλούς διαφορετικούς τρόπους. Αν θέλαμε, θα μπορούσαμε απλώς να το εκτελέσουμε τοπικά από ένα τερματικό ενώ συνδέουμε εξ αποστάσεως στο GCP.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Ωστόσο, πρόκειται να το εκτελέσουμε χρησιμοποιώντας το DataFlow. Μπορούμε να το κάνουμε χρησιμοποιώντας την παρακάτω εντολή ορίζοντας τις παρακάτω απαιτούμενες παραμέτρους.

  • project — Αναγνωριστικό του έργου σας GCP.
  • runner είναι ένας δρομολογητής αγωγών που θα αναλύσει το πρόγραμμά σας και θα κατασκευάσει τον αγωγό σας. Για να τρέξετε στο cloud, πρέπει να καθορίσετε ένα DataflowRunner.
  • staging_location — η διαδρομή προς τον χώρο αποθήκευσης cloud Dataflow για την ευρετηρίαση πακέτων κώδικα που χρειάζονται οι επεξεργαστές που εκτελούν την εργασία.
  • temp_location — διαδρομή προς το χώρο αποθήκευσης cloud Dataflow Cloud για την αποθήκευση προσωρινών αρχείων εργασιών που δημιουργούνται ενώ εκτελείται η διοχέτευση.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Ενώ εκτελείται αυτή η εντολή, μπορούμε να μεταβούμε στην καρτέλα DataFlow στην κονσόλα Google και να προβάλουμε τη διοχέτευσή μας. Όταν κάνουμε κλικ στο pipeline, θα πρέπει να δούμε κάτι παρόμοιο με το Σχήμα 4. Για λόγους εντοπισμού σφαλμάτων, μπορεί να είναι πολύ χρήσιμο να μεταβείτε στα αρχεία καταγραφής και μετά στο Stackdriver για να δείτε λεπτομερή αρχεία καταγραφής. Αυτό με βοήθησε να επιλύσω ζητήματα αγωγών σε πολλές περιπτώσεις.

Δημιουργούμε έναν αγωγό επεξεργασίας δεδομένων ροής. Μέρος 2ο
Εικόνα 4: Μεταφορέας δοκού

Πρόσβαση στα δεδομένα μας στο BigQuery

Έτσι, θα πρέπει να έχουμε ήδη μια διοχέτευση που τρέχει με δεδομένα που ρέουν στον πίνακά μας. Για να το ελέγξουμε αυτό, μπορούμε να πάμε στο BigQuery και να δούμε τα δεδομένα. Αφού χρησιμοποιήσετε την παρακάτω εντολή, θα πρέπει να δείτε τις πρώτες λίγες σειρές του συνόλου δεδομένων. Τώρα που έχουμε τα δεδομένα αποθηκευμένα στο BigQuery, μπορούμε να κάνουμε περαιτέρω ανάλυση, καθώς και να μοιραστούμε τα δεδομένα με συναδέλφους και να αρχίσουμε να απαντάμε σε επιχειρηματικές ερωτήσεις.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Δημιουργούμε έναν αγωγό επεξεργασίας δεδομένων ροής. Μέρος 2ο
Εικόνα 5: BigQuery

Συμπέρασμα

Ελπίζουμε αυτή η ανάρτηση να χρησιμεύσει ως χρήσιμο παράδειγμα για τη δημιουργία ενός αγωγού δεδομένων ροής, καθώς και για την εύρεση τρόπων για να γίνουν τα δεδομένα πιο προσιτά. Η αποθήκευση δεδομένων σε αυτή τη μορφή μας δίνει πολλά πλεονεκτήματα. Τώρα μπορούμε να αρχίσουμε να απαντάμε σε σημαντικές ερωτήσεις όπως πόσοι άνθρωποι χρησιμοποιούν το προϊόν μας; Η βάση των χρηστών σας αυξάνεται με την πάροδο του χρόνου; Με ποιες πτυχές του προϊόντος αλληλεπιδρούν περισσότερο οι άνθρωποι; Και υπάρχουν λάθη εκεί που δεν πρέπει να υπάρχουν; Αυτά είναι τα ερωτήματα που θα ενδιαφέρουν τον οργανισμό. Με βάση τις πληροφορίες που προκύπτουν από τις απαντήσεις σε αυτές τις ερωτήσεις, μπορούμε να βελτιώσουμε το προϊόν και να αυξήσουμε την αφοσίωση των χρηστών.

Το Beam είναι πραγματικά χρήσιμο για αυτό το είδος άσκησης και έχει επίσης μια σειρά από άλλες ενδιαφέρουσες περιπτώσεις χρήσης. Για παράδειγμα, μπορεί να θέλετε να αναλύσετε τα δεδομένα των μετοχών σε πραγματικό χρόνο και να κάνετε συναλλαγές με βάση την ανάλυση, ίσως έχετε δεδομένα αισθητήρων που προέρχονται από οχήματα και θέλετε να υπολογίσετε τους υπολογισμούς του επιπέδου κίνησης. Θα μπορούσατε επίσης, για παράδειγμα, να είστε μια εταιρεία τυχερών παιχνιδιών που συλλέγει δεδομένα χρηστών και τα χρησιμοποιεί για τη δημιουργία πινάκων εργαλείων για την παρακολούθηση βασικών μετρήσεων. Εντάξει, κύριοι, αυτό είναι ένα θέμα για μια άλλη ανάρτηση, ευχαριστώ για την ανάγνωση, και για όσους θέλουν να δουν τον πλήρη κώδικα, παρακάτω είναι ο σύνδεσμος για το GitHub μου.

https://github.com/DFoly/User_log_pipeline

Αυτό είναι όλο. Διαβάστε το πρώτο μέρος.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο