Ons skep 'n pyplyn vir stroomdataverwerking. Deel 2

Hi almal. Ons deel die vertaling van die laaste deel van die artikel, wat spesifiek vir die studente van die kursus voorberei is Data Ingenieur. Die eerste deel kan gevind word hier.

Apache Beam en DataFlow vir intydse pyplyne

Ons skep 'n pyplyn vir stroomdataverwerking. Deel 2

Stel Google Cloud op

Let wel: Ek het Google Cloud Shell gebruik om die pyplyn te laat loop en gebruikerslogdata te publiseer omdat ek probleme ondervind het om die pyplyn in Python 3 te laat loop. Google Cloud Shell gebruik Python 2, wat meer ooreenstem met Apache Beam.

Om die pyplyn aan die gang te kry, moet ons 'n bietjie in die instellings delf. Vir diΓ© van julle wat nog nooit GCP gebruik het nie, moet jy die volgende 6 stappe hierin voltooi bladsy.

Daarna sal ons ons skrifte na Google Cloud Storage moet oplaai en dit na ons Google Cloud Shel moet kopieer. Oplaai na wolkberging is nogal triviaal (beskrywing kan gevind word hier). Om ons lΓͺers te kopieer, kan ons Google Cloud Shel vanaf die nutsbalk oopmaak deur op die eerste ikoon aan die linkerkant in Figuur 2 hieronder te klik.

Ons skep 'n pyplyn vir stroomdataverwerking. Deel 2
Figuur 2

Die opdragte wat ons nodig het om die lΓͺers te kopieer en die vereiste biblioteke te installeer, word hieronder gelys.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Die skep van ons databasis en tabel

Sodra ons al die opstellingstappe voltooi het, is die volgende ding wat ons moet doen om 'n datastel en tabel in BigQuery te skep. Daar is verskeie maniere om dit te doen, maar die eenvoudigste is om die Google Wolk-konsole te gebruik deur eers 'n datastel te skep. Jy kan die stappe in die volgende volg skakelom 'n tabel met 'n skema te skep. Ons tafel sal hΓͺ 7 kolomme, wat ooreenstem met die komponente van elke gebruikerslogboek. Gerieflikheidshalwe sal ons alle kolomme definieer as stringe (van tipe string), behalwe vir die tydlokale veranderlike, en hulle benoem volgens die veranderlikes wat ons vroeΓ«r gegenereer het. Ons tabeluitleg moet soos Figuur 3 lyk.

Ons skep 'n pyplyn vir stroomdataverwerking. Deel 2
Figuur 3. Tabelskema

Publikasie van gebruikerslogdata

Pub/Sub is 'n kritieke komponent van ons pyplyn aangesien dit verskeie onafhanklike toepassings toelaat om met mekaar te kommunikeer. Dit werk spesifiek as 'n tussenganger wat ons toelaat om boodskappe tussen toepassings te stuur en te ontvang. Die eerste ding wat ons moet doen is om 'n onderwerp (onderwerp) te skep. Gaan eenvoudig na Pub/Sub in die konsole en klik SKEP ONDERWERP.

Die kode hieronder noem ons skrif om die logdata wat hierbo gedefinieer is te genereer en koppel dan en stuur die logs na Pub/Sub. Die enigste ding wat ons moet doen is om 'n voorwerp te skep Publisher Client, spesifiseer die temapad deur die metode te gebruik topic_path en roep die funksie publish с topic_path en data. Neem asseblief kennis dat ons invoer generate_log_line uit ons draaiboek stream_logsmaak dus seker daardie lΓͺers is in dieselfde vouer anders sal jy 'n invoerfout kry. Ons kan dit dan deur ons Google-konsole laat loop deur:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Sodra die lΓͺer uitgevoer is, sal ons die uitvoer van die logdata na die konsole kan waarneem, soos in die figuur hieronder getoon. Hierdie skrip sal loop solank ons ​​nie gebruik nie Ctrl + Com dit te voltooi.

Ons skep 'n pyplyn vir stroomdataverwerking. Deel 2
Figuur 4. Gevolgtrekking publish_logs.py

Die skryf van ons pyplynkode

Noudat ons alles opgestel het, kan ons by die prettige deel kom – die kodering van ons pyplyn met Beam en Python. Om 'n Beam-pyplyn te skep, moet ons 'n pyplynvoorwerp skep (p). Sodra ons 'n pyplynvoorwerp geskep het, kan ons verskeie funksies een na die ander toepas deur die operateur te gebruik pipe (|). Oor die algemeen lyk die werkvloei soos die figuur hieronder.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

In ons kode sal ons twee gebruikersgedefinieerde funksies skep. Funksie regex_clean, wat die data skandeer en die ooreenstemmende string onttrek op grond van die lys PATTERNS met behulp van die funksie re.search. Die funksie gee 'n komma-geskeide string terug. As jy nie 'n kenner van gereelde uitdrukkings is nie, beveel ek aan om dit na te gaan. tutoriaal en oefen in notaboek om die kode te toets. Ons definieer dan 'n persoonlike ParDo funksie genoem Split, wat 'n variasie van Beam transform vir parallelle verwerking is. In Python word dit op 'n spesiale manier gedoen - ons moet 'n klas skep wat van die DoFn Beam-klas erf. Die Split-funksie neem die ontleed string van die vorige funksie af en gee 'n lys van woordeboeke terug met sleutels wat ooreenstem met die kolomname in ons BigQuery-tabel. Daar is iets om op te let oor hierdie funksie: ek moes invoer datetime binne die funksie om dit te laat werk. Ek het 'n fout met invoer aan die begin van die lΓͺer gekry, wat vreemd was. Hierdie lys word dan na die funksie oorgedra WriteToBigQuery, wat eenvoudig ons data by die tabel voeg. Die kode vir Batch DataFlow Job en Streaming DataFlow Job word hieronder gegee. Die enigste verskil tussen bondel- en stroomkode is dat ons in bondel CSV van lees src_pathdie funksie te gebruik ReadFromText van Beam.

Batch DataFlow Job (joernaalverwerking)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Stroom DataFlow Job (stroomverwerking)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Die pyplyn bestuur

Ons kan die pyplyn op verskeie verskillende maniere begin. As ons wou, kan ons dit net plaaslik vanaf 'n terminaal laat loop deur op afstand by GCP aan te meld.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Ons gaan dit egter met DataFlow laat loop. Ons kan dit doen met die onderstaande opdrag deur die volgende vereiste parameters in te stel.

  • project - ID van jou GCP-projek.
  • runner is 'n pyplynloper wat jou program sal ontleed en jou pyplyn sal bou. Om in die wolk te hardloop, moet jy 'n DataflowRunner spesifiseer.
  • staging_location is die pad na die Wolk Dataflow-wolkberging vir die indeksering van die kodepakkette wat benodig word deur die hanteerders wat die werk verrig.
  • temp_location - Pad na die Wolk Dataflow-wolkberging vir die plasing van tydelike werklΓͺers wat tydens die werking van die pyplyn geskep is.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Terwyl hierdie opdrag loop, kan ons na die DataFlow-oortjie in die Google-konsole gaan en ons pyplyn bekyk. Deur op die pyplyn te klik, behoort ons iets soortgelyk aan Figuur 4 te sien. Vir ontfoutingsdoeleindes kan dit baie nuttig wees om na die logs te gaan en dan na Stackdriver om die gedetailleerde logs te sien. Dit het my in 'n aantal gevalle gehelp om probleme met die pyplyn op te los.

Ons skep 'n pyplyn vir stroomdataverwerking. Deel 2
Figuur 4: Balk pyplyn

Toegang tot ons data in BigQuery

Dus, ons behoort reeds 'n pyplyn te hΓͺ met data wat in ons tabel kom. Om dit te toets, kan ons na BigQuery gaan en die data bekyk. Nadat u die opdrag hieronder gebruik het, behoort u die eerste paar reΓ«ls van die datastel te sien. Noudat ons die data in BigQuery gestoor het, kan ons verdere ontleding doen asook die data met kollegas deel en besigheidsvrae begin beantwoord.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Ons skep 'n pyplyn vir stroomdataverwerking. Deel 2
Figuur 5: BigQuery

Gevolgtrekking

Ons hoop dat hierdie pos sal dien as 'n nuttige voorbeeld van die bou van 'n stroomdatapyplyn, sowel as om maniere te vind om data meer toeganklik te maak. Die stoor van data in hierdie formaat bied ons baie voordele. Nou kan ons belangrike vrae begin beantwoord soos hoeveel mense ons produk gebruik? Groei die gebruikersbasis met verloop van tyd? Met watter aspekte van die produk het mense die meeste interaksie? En is daar foute waar dit nie behoort te wees nie? Dit is die vrae wat vir die organisasie van belang sal wees. Gebaseer op die insigte wat uit die antwoorde op hierdie vrae gegenereer word, sal ons die produk kan verbeter en gebruikersbetrokkenheid verhoog.

Beam is regtig nuttig vir hierdie tipe oefening en het ook 'n aantal ander interessante gebruiksgevalle. Byvoorbeeld, jy kan voorraad bosluis data in reΓ«le tyd ontleed en handel dryf op grond van die ontleding, miskien het jy sensordata wat van voertuie af kom en wil jy die verkeersvlakberekening bereken. Jy kan ook byvoorbeeld 'n speletjiemaatskappy wees wat gebruikersdata insamel en dit gebruik om dashboards te skep om sleutelstatistieke op te spoor. Goed, menere, hierdie is 'n onderwerp vir 'n ander plasing, dankie vir die lees, en vir diegene wat die volledige kode wil sien, hieronder is 'n skakel na my GitHub.

https://github.com/DFoly/User_log_pipeline

Dit is alles. Lees die eerste deel.

Bron: will.com

Voeg 'n opmerking