Wy meitsje in pipeline foar streamgegevensferwurking. Diel 2

Hoi allegearre. Wy diele de oersetting fan it lêste diel fan it artikel, spesifyk taret foar studinten fan 'e kursus. Data Engineer. Jo kinne it earste diel lêze hjir.

Apache Beam en DataFlow foar Real-Time Pipelines

Wy meitsje in pipeline foar streamgegevensferwurking. Diel 2

Google Cloud ynstelle

Opmerking: ik brûkte Google Cloud Shell om de pipeline út te fieren en oanpaste loggegevens te publisearjen, om't ik problemen hie mei it útfieren fan de pipeline yn Python 3. Google Cloud Shell brûkt Python 2, wat mear konsekwint is mei Apache Beam.

Om de pipeline te begjinnen, moatte wy in bytsje grave yn 'e ynstellingen. Foar dy fan jimme dy't GCP net earder hawwe brûkt, moatte jo de folgjende 6 stappen folgje dy't yn dizze beskreaun binne side.

Hjirnei moatte wy ús skripts uploade nei Google Cloud Storage en kopiearje nei ús Google Cloud Shel. Uploaden nei wolk opslach is frij triviaal (in beskriuwing kin fûn wurde hjir). Om ús bestannen te kopiearjen, kinne wy ​​​​Google Cloud Shel iepenje fan 'e arkbalke troch te klikken op it earste ikoan links yn figuer 2 hjirûnder.

Wy meitsje in pipeline foar streamgegevensferwurking. Diel 2
Ofbylding 2

De kommando's dy't wy nedich binne om de bestannen te kopiearjen en de fereaske biblioteken te ynstallearjen binne hjirûnder neamd.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

It meitsjen fan ús databank en tabel

Sadree't wy hawwe foltôge alle opset relatearre stappen, it folgjende ding dat wy moatte dwaan is meitsje in dataset en tabel yn BigQuery. D'r binne ferskate manieren om dit te dwaan, mar de ienfâldichste is om de Google Cloud-konsole te brûken troch earst in dataset te meitsjen. Jo kinne de stappen hjirûnder folgje linkom in tabel te meitsjen mei in skema. Us tafel sil hawwe 7 kolom, oerienkommende mei de komponinten fan elke brûkerslog. Foar gemak sille wy alle kolommen definiearje as snaren, útsein de timelocal fariabele, en neame se neffens de fariabelen dy't wy earder genereare. De yndieling fan ús tabel moat der útsjen as yn figuer 3.

Wy meitsje in pipeline foar streamgegevensferwurking. Diel 2
figuer 3. tabel layout

It publisearjen fan brûkersloggegevens

Pub/Sub is in kritysk ûnderdiel fan ús pipeline, om't it meardere ûnôfhinklike applikaasjes mooglik makket om mei elkoar te kommunisearjen. Benammen wurket it as in tuskenpersoan wêrtroch ús berjochten ferstjoere en ûntfange kinne tusken applikaasjes. It earste ding dat wy moatte dwaan is in ûnderwerp oanmeitsje. Gean gewoan nei Pub / Sub yn 'e konsole en klikje ONDERWERK CREATE.

De koade hjirûnder neamt ús skript om de hjirboppe definieare loggegevens te generearjen en ferbynt en stjoert de logs dan nei Pub/Sub. It iennichste wat wy moatte dwaan is in objekt meitsje PublisherClient, spesifisearje it paad nei it ûnderwerp mei de metoade topic_path en belje de funksje publish с topic_path en gegevens. Tink derom dat wy ymportearje generate_log_line út ús skript stream_logs, soargje derfoar dat dizze bestannen yn deselde map steane, oars krije jo in ymportflater. Wy kinne dit dan útfiere fia ús Google-konsole mei:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Sadree't it bestân rint, sille wy de útfier fan 'e loggegevens nei de konsole sjen kinne, lykas werjûn yn' e ôfbylding hjirûnder. Dit skript sil wurkje salang't wy net brûke CTRL + Com it te foltôgjen.

Wy meitsje in pipeline foar streamgegevensferwurking. Diel 2
figuer 4. Utfier publish_logs.py

Us pipeline-koade skriuwe

No't wy alles klear hawwe, kinne wy ​​​​it leuke diel begjinne - ús pipeline kodearje mei Beam en Python. Om in Beam-pipeline te meitsjen, moatte wy in pipeline-objekt (p) meitsje. Sadree't wy hawwe makke in pipeline foarwerp, wy kinne tapasse meardere funksjes ien nei de oare mei help fan de operator pipe (|). Yn 't algemien liket de workflow as de ôfbylding hjirûnder.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Yn ús koade sille wy twa oanpaste funksjes oanmeitsje. Funksje regex_clean, dy't de gegevens scant en de oerienkommende rige ophelje op basis fan de PATTERNS list mei de funksje re.search. De funksje jout in komma-skieden tekenrige werom. As jo ​​​​gjin ekspert op reguliere ekspresje binne, advisearje ik dit te kontrolearjen tutorial en oefenje yn in notepad om de koade te kontrolearjen. Nei dit wy definiearje in oanpaste ParDo funksje neamd Spjalte, dat is in fariaasje fan 'e Beam-transformaasje foar parallelle ferwurking. Yn Python wurdt dit op in spesjale manier dien - wy moatte in klasse meitsje dy't erft fan 'e DoFn Beam-klasse. De Split-funksje nimt de parsearde rige fan 'e foarige funksje en jout in list mei wurdboeken werom mei kaaien dy't oerienkomme mei de kolomnammen yn ús BigQuery-tabel. D'r is wat te merken oer dizze funksje: ik moast ymportearje datetime binnen in funksje om it te wurkjen. Ik krige in ymportflater oan it begjin fan it bestân, dat wie nuver. Dizze list wurdt dan trochjûn oan de funksje WriteToBigQuery, dy't gewoan ús gegevens taheakket oan 'e tabel. De koade foar Batch DataFlow Job en Streaming DataFlow Job wurdt hjirûnder jûn. It ienige ferskil tusken batch en streaming koade is dat yn batch wy lêze de CSV út src_pathmei help fan de funksje ReadFromText fan Beam.

Batch DataFlow Job (batchferwurking)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (streamferwurking)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Begjin fan de transportband

Wy kinne de pipeline op ferskate ferskillende manieren útfiere. As wy woenen, koenen wy it gewoan lokaal útfiere fan in terminal by it oanmelden by GCP op ôfstân.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Wy sille it lykwols útfiere mei DataFlow. Wy kinne dit dwaan mei it ûnderste kommando troch de folgjende fereaske parameters yn te stellen.

  • project - ID fan jo GCP-projekt.
  • runner is in pipeline runner dy't jo programma sil analysearje en jo pipeline sil konstruearje. Om yn 'e wolk te rinnen, moatte jo in DataflowRunner opjaan.
  • staging_location - it paad nei de Cloud Dataflow-wolkopslach foar yndeksearjen fan koadepakketten dy't nedich binne troch de processors dy't it wurk útfiere.
  • temp_location - paad nei de Cloud Dataflow-wolkopslach foar it bewarjen fan tydlike taakbestannen makke wylst de pipeline rint.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Wylst dit kommando rint, kinne wy ​​​​nei it tabblêd DataFlow yn 'e google-konsole gean en ús pipeline besjen. As wy op 'e pipeline klikke, soene wy ​​​​wat sjen moatte dat liket op figuer 4. Foar debuggen kin it tige nuttich wêze om nei Logs te gean en dan nei Stackdriver om detaillearre logs te besjen. Dit hat my holpen by it oplossen fan pipelineproblemen yn in oantal gefallen.

Wy meitsje in pipeline foar streamgegevensferwurking. Diel 2
figuer 4: Beam conveyor

Tagong ta ús gegevens yn BigQuery

Dat, wy moatte al in pipeline hawwe dy't rint mei gegevens dy't yn ús tabel streame. Om dit te testen, kinne wy ​​​​nei BigQuery gean en de gegevens besjen. Nei it brûken fan it kommando hjirûnder moatte jo de earste pear rigen fan 'e dataset sjen. No't wy de gegevens hawwe opslein yn BigQuery, kinne wy ​​fierdere analyse útfiere, lykas de gegevens diele mei kollega's en begjinne mei it beantwurdzjen fan saaklike fragen.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Wy meitsje in pipeline foar streamgegevensferwurking. Diel 2
figuer 5: BigQuery

konklúzje

Wy hoopje dat dit berjocht tsjinnet as in nuttich foarbyld fan it meitsjen fan in streaminggegevenspipeline, lykas ek manieren fine om gegevens tagonkliker te meitsjen. It opslaan fan gegevens yn dit formaat jout ús in protte foardielen. No kinne wy ​​begjinne mei it beantwurdzjen fan wichtige fragen lykas hoefolle minsken ús produkt brûke? Groeit jo brûkersbasis oer de tiid? Mei hokker aspekten fan it produkt omgean minsken it meast? En binne d'r flaters wêr't net wêze moatte? Dit binne de fragen dy't ynteressearje sille foar de organisaasje. Op grûn fan de ynsjoggen dy't fuortkomme út 'e antwurden op dizze fragen, kinne wy ​​it produkt ferbetterje en de belutsenens fan brûkers ferheegje.

Beam is echt nuttich foar dit soarte fan oefening en hat ek in oantal oare nijsgjirrige gebrûk gefallen. Jo kinne bygelyks stock tickgegevens yn realtime analysearje en hannelingen meitsje op basis fan 'e analyse, miskien hawwe jo sensorgegevens dy't komme fan auto's en wolle berekkeningen fan ferkearsnivo berekkenje. Jo kinne bygelyks ek in gamingbedriuw wêze dat brûkersgegevens sammelt en brûkt om dashboards te meitsjen om wichtige metriken te folgjen. Okee, hearen, dit is in ûnderwerp foar in oare post, tank foar it lêzen, en foar dyjingen dy't de folsleine koade wolle sjen, hjirûnder is de keppeling nei myn GitHub.

https://github.com/DFoly/User_log_pipeline

Da's alles. Lês diel ien.

Boarne: www.habr.com

Add a comment