Creem un pipeline de processament de dades de flux. Part 2

Hola a tots. Compartim la traducció de la part final de l'article, elaborada específicament per als alumnes del curs. Enginyer de dades. Podeu llegir la primera part aquí.

Apache Beam i DataFlow per a canonades en temps real

Creem un pipeline de processament de dades de flux. Part 2

Configuració de Google Cloud

Nota: he utilitzat Google Cloud Shell per executar la canalització i publicar dades de registre d'usuari perquè tenia problemes per executar la canalització a Python 3. Google Cloud Shell utilitza Python 2, que és més coherent amb Apache Beam.

Per iniciar el pipeline, hem d'aprofundir una mica en la configuració. Per a aquells de vosaltres que no heu utilitzat GCP abans, haureu de seguir els 6 passos següents que es descriuen en aquest pàgina.

Després d'això, haurem de penjar els nostres scripts a Google Cloud Storage i copiar-los al nostre Google Cloud Shel. La càrrega a l'emmagatzematge al núvol és bastant trivial (es pot trobar una descripció aquí). Per copiar els nostres fitxers, podem obrir Google Cloud Shel des de la barra d'eines fent clic a la primera icona de l'esquerra de la figura 2 següent.

Creem un pipeline de processament de dades de flux. Part 2
Figura 2

Les ordres que necessitem per copiar els fitxers i instal·lar les biblioteques necessàries es mostren a continuació.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Creació de la nostra base de dades i taula

Un cop hàgim completat tots els passos relacionats amb la configuració, el següent que hem de fer és crear un conjunt de dades i una taula a BigQuery. Hi ha diverses maneres de fer-ho, però la més senzilla és utilitzar la consola de Google Cloud creant primer un conjunt de dades. Podeu seguir els passos següents enllaçper crear una taula amb un esquema. La nostra taula tindrà 7 columnes, corresponent als components de cada registre d'usuari. Per comoditat, definirem totes les columnes com a cadenes, excepte la variable timelocal, i les anomenarem segons les variables que hem generat anteriorment. La disposició de la nostra taula hauria de semblar a la figura 3.

Creem un pipeline de processament de dades de flux. Part 2
Figura 3. Distribució de la taula

Publicació de dades de registre d'usuari

Pub/Sub és un component crític del nostre pipeline perquè permet que diverses aplicacions independents es comuniquin entre elles. En concret, funciona com a intermediari que ens permet enviar i rebre missatges entre aplicacions. El primer que hem de fer és crear un tema. Simplement aneu a Pub/Sub a la consola i feu clic a CREAR TEMA.

El codi següent crida al nostre script per generar les dades de registre definides anteriorment i després connecta i envia els registres a Pub/Sub. L'únic que hem de fer és crear un objecte PublisherClient, especifiqueu el camí al tema mitjançant el mètode topic_path i crida a la funció publish с topic_path i dades. Tingueu en compte que importem generate_log_line del nostre guió stream_logs, així que assegureu-vos que aquests fitxers estiguin a la mateixa carpeta, en cas contrari obtindreu un error d'importació. Aleshores podem executar-ho a través de la nostra consola de Google fent servir:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Tan bon punt s'executi el fitxer, podrem veure la sortida de les dades de registre a la consola, tal com es mostra a la figura següent. Aquest script funcionarà mentre no l'utilitzem CTRL + Cper completar-lo.

Creem un pipeline de processament de dades de flux. Part 2
Figura 4. Sortida publish_logs.py

Escrivint el nostre codi de pipeline

Ara que ho tenim tot preparat, podem començar la part divertida: codificar la nostra canalització amb Beam i Python. Per crear una canalització Beam, hem de crear un objecte canalització (p). Un cop hem creat un objecte pipeline, podem aplicar múltiples funcions una darrere l'altra mitjançant l'operador pipe (|). En general, el flux de treball s'assembla a la imatge següent.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Al nostre codi, crearem dues funcions personalitzades. Funció regex_clean, que escaneja les dades i recupera la fila corresponent en funció de la llista PATRONS mitjançant la funció re.search. La funció retorna una cadena separada per comes. Si no sou un expert en expressió regular, us recomano que feu una ullada a això tutorial i practicar en un bloc de notes per comprovar el codi. Després d'això definim una funció personalitzada ParDo anomenada divisió, que és una variació de la transformada Beam per al processament paral·lel. A Python, això es fa d'una manera especial: hem de crear una classe que hereta de la classe DoFn Beam. La funció Dividir agafa la fila analitzada de la funció anterior i retorna una llista de diccionaris amb claus corresponents als noms de columnes de la nostra taula de BigQuery. Hi ha alguna cosa a destacar sobre aquesta funció: vaig haver d'importar datetime dins d'una funció perquè funcioni. Vaig rebre un error d'importació al principi del fitxer, que era estrany. A continuació, aquesta llista es passa a la funció WriteToBigQuery, que simplement afegeix les nostres dades a la taula. A continuació es mostra el codi del treball de flux de dades per lots i del treball de flux de dades en continu. L'única diferència entre el codi per lots i el de streaming és que en batch llegim el CSV src_pathutilitzant la funció ReadFromText de Beam.

Treball de flux de dades per lots (processament per lots)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Treball de flux de dades en temps real (processament de flux)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Execució de la canonada

Podem executar el pipeline de diverses maneres diferents. Si volguéssim, podríem executar-lo localment des d'un terminal mentre iniciem sessió a GCP de forma remota.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Tanmateix, l'executarem amb DataFlow. Podem fer-ho mitjançant l'ordre següent establint els següents paràmetres necessaris.

  • project — ID del vostre projecte GCP.
  • runner és un corredor de pipelines que analitzarà el vostre programa i construirà el vostre pipeline. Per executar-se al núvol, heu d'especificar un DataflowRunner.
  • staging_location — el camí a l'emmagatzematge en núvol de Cloud Dataflow per indexar els paquets de codi necessaris pels processadors que realitzen el treball.
  • temp_location — camí a l'emmagatzematge en núvol de Cloud Dataflow per emmagatzemar fitxers de treball temporals creats mentre s'executa la canalització.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Mentre s'executa aquesta ordre, podem anar a la pestanya DataFlow de la consola de Google i veure el nostre pipeline. Quan fem clic a la canalització, hauríem de veure alguna cosa semblant a la figura 4. Per a finalitats de depuració, pot ser molt útil anar a Registres i després a Stackdriver per veure els registres detallats. Això m'ha ajudat a resoldre problemes de canalització en diversos casos.

Creem un pipeline de processament de dades de flux. Part 2
Figura 4: Transportador de biga

Accediu a les nostres dades a BigQuery

Per tant, ja hauríem de tenir un pipeline en funcionament amb dades que flueixin a la nostra taula. Per provar-ho, podem anar a BigQuery i mirar les dades. Després d'utilitzar l'ordre següent, hauríeu de veure les primeres files del conjunt de dades. Ara que tenim les dades emmagatzemades a BigQuery, podem fer més anàlisis, així com compartir-les amb els companys i començar a respondre preguntes empresarials.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Creem un pipeline de processament de dades de flux. Part 2
Figura 5: BigQuery

Conclusió

Esperem que aquesta publicació serveixi d'exemple útil per crear una canalització de dades en streaming, així com per trobar maneres de fer que les dades siguin més accessibles. Emmagatzemar dades en aquest format ens aporta molts avantatges. Ara podem començar a respondre preguntes importants com ara quantes persones fan servir el nostre producte? La vostra base d'usuaris està creixent amb el temps? Amb quins aspectes del producte interactuen més les persones? I hi ha errors on no n'hi hauria d'haver? Aquestes són les preguntes que seran d'interès per a l'organització. A partir dels coneixements que sorgeixen de les respostes a aquestes preguntes, podem millorar el producte i augmentar la implicació dels usuaris.

Beam és realment útil per a aquest tipus d'exercicis i també té una sèrie d'altres casos d'ús interessants. Per exemple, és possible que vulgueu analitzar les dades d'existències en temps real i fer operacions basades en l'anàlisi, potser teniu dades de sensors procedents de vehicles i voleu calcular els càlculs del nivell de trànsit. També podríeu ser, per exemple, una empresa de jocs que recopila dades dels usuaris i les utilitza per crear taulers de control per fer un seguiment de mètriques clau. D'acord, senyors, aquest és un tema per a una altra publicació, gràcies per llegir-lo, i per a aquells que vulguin veure el codi complet, a continuació teniu l'enllaç al meu GitHub.

https://github.com/DFoly/User_log_pipeline

Això és tot. Llegeix la primera part.

Font: www.habr.com

Afegeix comentari