Mir kreéieren eng Stroumdatenveraarbechtungspipeline. Deel 2

Moien alleguer. Mir deelen d'Iwwersetzung vum leschten Deel vum Artikel, speziell fir Studenten vum Cours virbereet. Daten Ingenieur. Dir kënnt den éischten Deel liesen hei.

Apache Beam an DataFlow fir Echtzäit Pipelines

Mir kreéieren eng Stroumdatenveraarbechtungspipeline. Deel 2

Google Cloud opsetzen

Bemierkung: Ech hunn Google Cloud Shell benotzt fir d'Pipeline auszeféieren an personaliséiert Logdaten ze verëffentlechen, well ech Problemer haten d'Pipeline am Python 3 ze lafen. Google Cloud Shell benotzt Python 2, wat méi konsequent mat Apache Beam ass.

Fir d'Pipeline ze starten, musse mir e bëssen an d'Astellunge gräifen. Fir déi vun iech, déi GCP net virdru benotzt hunn, musst Dir déi folgend 6 Schrëtt verfollegen, déi an dësem beschriwwe sinn. Säit.

Duerno musse mir eis Scripten op Google Cloud Storage eroplueden an se op eis Google Cloud Shel kopéieren. Eroplueden op Cloud Storage ass zimmlech trivial (eng Beschreiwung ka fonnt ginn hei). Fir eis Dateien ze kopéieren, kënne mir Google Cloud Shel vun der Toolbar opmaachen andeems Dir op déi éischt Ikon op der lénkser Säit an der Figur 2 klickt.

Mir kreéieren eng Stroumdatenveraarbechtungspipeline. Deel 2
2 Figur

D'Befehle déi mir brauchen fir d'Dateien ze kopéieren an déi erfuerderlech Bibliothéiken z'installéieren sinn hei ënnendrënner opgezielt.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Schafen eis Datebank an Dësch

Wann mir all d'Setup-relatéierte Schrëtt ofgeschloss hunn, ass déi nächst Saach, déi mir maache mussen, en Dataset an Dësch an BigQuery erstellen. Et gi verschidde Weeër fir dëst ze maachen, awer am einfachsten ass d'Google Cloud Konsole ze benotzen andeems Dir als éischt en Dataset erstellt. Dir kënnt d'Schrëtt hei ënnen verfollegen Linkfir en Dësch mat engem Schema ze kreéieren. Eisen Dësch wäert hunn 7 Kolonnen, entspriechend de Komponente vun all Benotzer Log. Fir d'Bequemlechkeet wäerte mir all Sailen als Saiten definéieren, ausser déi Zäitlokal Variabel, an nennen se no de Variablen déi mir virdru generéiert hunn. De Layout vun eisem Dësch soll ausgesinn wéi an der Figur 3.

Mir kreéieren eng Stroumdatenveraarbechtungspipeline. Deel 2
Figur 3. Dësch Layout

Verëffentlechung vum Benotzerlogdaten

Pub/Sub ass e kriteschen Bestanddeel vun eiser Pipeline well et erlaabt verschidde onofhängeg Uwendungen mateneen ze kommunizéieren. Besonnesch funktionnéiert et als Tëschestatioun, deen eis erlaabt Messagen tëscht Uwendungen ze schécken an ze kréien. Dat éischt wat mir maache mussen ass en Thema erstellen. Gitt einfach op Pub / Sub an der Konsole a klickt CREATE TOPIC.

De Code hei drënner rifft eise Skript fir d'Logdaten ze generéieren déi hei uewen definéiert sinn an dann verbënnt a schéckt d'Logbicher op Pub / Sub. Dat eenzegt wat mir maache mussen ass en Objet erstellen PublisherClient, spezifizéiert de Wee zum Thema mat der Method topic_path an ruffen d'Funktioun publish с topic_path an daten. Maacht weg datt mir importéieren generate_log_line vun eisem Skript stream_logs, also vergewëssert Iech datt dës Dateien am selwechten Dossier sinn, soss kritt Dir en Importfehler. Mir kënnen dat dann duerch eis Google Konsol lafen mat:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Soubal d'Datei leeft, kënne mir d'Ausgab vun de Logdaten op d'Konsole gesinn, wéi an der Figur hei ënnen. Dëst Skript funktionnéiert soulaang mir net benotzen CTRL + Cet fäerdeg ze maachen.

Mir kreéieren eng Stroumdatenveraarbechtungspipeline. Deel 2
Figur 4. Ausgang publish_logs.py

Schreiwen eise Pipeline Code

Elo datt mir alles virbereet hunn, kënne mir de Spaass Deel ufänken - eis Pipeline codéieren mat Beam a Python. Fir eng Beam Pipeline ze kreéieren, musse mir e Pipelineobjekt (p) erstellen. Wann mir e Pipeline-Objet erstallt hunn, kënne mir verschidde Funktiounen no der anerer benotze mam Bedreiwer pipe (|). Am Allgemengen gesäit de Workflow aus wéi d'Bild hei drënner.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

An eisem Code erstellen mir zwou personaliséiert Funktiounen. Funktioun regex_clean, déi d'Donnéeën scannt an déi entspriechend Zeil op Basis vun der PATTERNS Lëscht mat der Funktioun zréckhëlt re.search. D'Funktioun gëtt eng Comma getrennt String zréck. Wann Dir net e regelméisseg Ausdrock Expert sidd, Ech recommandéieren dëst ze kontrolléieren Tutorial a Praxis an engem Notizblock fir de Code ze kontrolléieren. Duerno definéiere mir eng personaliséiert ParDo Funktioun genannt Split, wat eng Variatioun vum Beam Transform fir parallel Veraarbechtung ass. Am Python gëtt dëst op eng speziell Manéier gemaach - mir mussen eng Klass erstellen déi vun der DoFn Beam Klass ierft. D'Split Funktioun hëlt déi parséiert Zeil vun der viregter Funktioun a gëtt eng Lëscht vun Dictionnairen zréck mat Schlësselen, déi zu de Kolonnennimm an eiser BigQuery Tabell entspriechen. Et gëtt eppes iwwer dës Funktioun ze notéieren: Ech hu missen importéieren datetime bannent enger Funktioun fir et ze schaffen. Ech krut en Importfehler am Ufank vun der Datei, wat komesch war. Dës Lëscht gëtt dann un d'Funktioun weiderginn WriteToBigQuery, déi einfach eis Donnéeën un den Dësch bäidréit. De Code fir Batch DataFlow Job a Streaming DataFlow Job gëtt hei ënnen uginn. Deen eenzegen Ënnerscheed tëscht Batch a Streaming Code ass datt a Batch mir d'CSV liesen src_pathbenotzt d'Funktioun ReadFromText vum Beam.

Batch DataFlow Job (Batchveraarbechtung)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (Streamveraarbechtung)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

De Fërderband starten

Mir kënnen d'Pipeline op verschidde Weeër lafen. Wa mir wollten, kënne mir et just lokal aus engem Terminal lafen wärend mir op GCP op afstand aloggen.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Wéi och ëmmer, mir lafen et mat DataFlow. Mir kënnen dat mat dem Kommando ënnen maachen andeems Dir déi folgend erfuerderlech Parameter setzt.

  • project - ID vun Ärem GCP Projet.
  • runner ass e Pipeline Runner deen Äre Programm analyséiert an Är Pipeline konstruéiert. Fir an der Wollek ze lafen, musst Dir en DataflowRunner spezifizéieren.
  • staging_location - de Wee op d'Cloud Dataflow Cloud Storage fir d'Indexéiere vu Code Packagen, déi vun de Prozessoren néideg sinn, déi d'Aarbecht ausféieren.
  • temp_location - Wee op d'Cloud Dataflow Cloud Storage fir temporär Aarbechtsdateien ze späicheren, déi erstallt gi wärend der Pipeline leeft.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Wärend dëse Kommando leeft, kënne mir op den DataFlow Tab an der Google Konsole goen an eis Pipeline kucken. Wann mir op d'Pipeline klickt, sollte mir eppes ähnlech wéi d'Bild gesinn 4. Fir Debugging Zwecker kann et ganz hëllefräich sinn fir op Logs ze goen an dann op Stackdriver fir déi detailléiert Logbicher ze gesinn. Dëst huet mir gehollef Pipeline Themen an enger Rei vu Fäll ze léisen.

Mir kreéieren eng Stroumdatenveraarbechtungspipeline. Deel 2
Figur 4: Beam conveyor

Zougrëff op eis Donnéeën am BigQuery

Also, mir sollten schonn eng Pipeline lafen mat Daten déi an eisen Dësch fléien. Fir dëst ze testen, kënne mir op BigQuery goen an d'Donnéeën kucken. Nodeems Dir de Kommando hei ënnen benotzt hutt, sollt Dir déi éischt Reihen vum Datesaz gesinn. Elo datt mir d'Donnéeën am BigQuery gespäichert hunn, kënne mir weider Analyse maachen, souwéi d'Donnéeën mat Kollegen deelen an ufänken Geschäftsfroen ze beäntweren.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Mir kreéieren eng Stroumdatenveraarbechtungspipeline. Deel 2
Figur 5: BigQuery

Konklusioun

Mir hoffen, datt dëse Post als nëtzlecht Beispill déngt fir eng Streaming Date Pipeline ze kreéieren, souwéi Weeër ze fannen fir Daten méi zougänglech ze maachen. Daten an dësem Format späicheren gëtt eis vill Virdeeler. Elo kënne mir ufänken wichteg Froen ze beäntweren wéi wéi vill Leit eise Produkt benotzen? Wuesse Är Benotzerbasis mat der Zäit? Mat wat fir Aspekter vum Produkt interagéieren d'Leit am meeschten? A ginn et Feeler wou et net sollt sinn? Dëst sinn d'Froen, déi fir d'Organisatioun interessant sinn. Baséierend op den Abléck, déi aus den Äntwerten op dës Froen erauskommen, kënne mir d'Produkt verbesseren an d'Benotzer Engagement erhéijen.

Beam ass wierklech nëtzlech fir dës Zort Übung an huet och eng Rei aner interessant Benotzungsfäll. Zum Beispill, Dir wëllt Aktie Tick-Daten an Echtzäit analyséieren an Handel maachen op Basis vun der Analyse, vläicht hutt Dir Sensordaten, déi vu Gefierer kommen a wëllt Berechnunge vum Trafficniveau berechnen. Dir kënnt och, zum Beispill, eng Spillfirma sinn déi Benotzerdaten sammelt a benotzt se fir Dashboards ze kreéieren fir Schlësselmetriken ze verfolgen. Okay, Hären, dëst ass en Thema fir en anere Post, Merci fir d'Liesen, a fir déi, déi de ganze Code wëllen gesinn, hei ënnen ass de Link op meng GitHub.

https://github.com/DFoly/User_log_pipeline

Dat ass alles. Liesen Deel eent.

Source: will.com

Setzt e Commentaire