Creamos una canalización de procesamiento de datos de flujo. Parte 2

Hola a todos. Compartimos la traducción de la parte final del artículo, preparada específicamente para los alumnos del curso. Ingeniero de datos. Puedes leer la primera parte. aquí.

Apache Beam y DataFlow para canalizaciones en tiempo real

Creamos una canalización de procesamiento de datos de flujo. Parte 2

Configurar Google Cloud

Nota: Utilicé Google Cloud Shell para ejecutar la canalización y publicar datos de registro personalizados porque tenía problemas para ejecutar la canalización en Python 3. Google Cloud Shell usa Python 2, que es más coherente con Apache Beam.

Para iniciar el proceso, debemos profundizar un poco en la configuración. Para aquellos de ustedes que no han usado GCP antes, deberán seguir los siguientes 6 pasos descritos en este página.

Después de esto, necesitaremos cargar nuestros scripts en Google Cloud Storage y copiarlos a nuestro Google Cloud Shel. Cargar al almacenamiento en la nube es bastante trivial (se puede encontrar una descripción aquí). Para copiar nuestros archivos, podemos abrir Google Cloud Shel desde la barra de herramientas haciendo clic en el primer ícono a la izquierda en la Figura 2 a continuación.

Creamos una canalización de procesamiento de datos de flujo. Parte 2
Figura 2

Los comandos que necesitamos para copiar los archivos e instalar las bibliotecas necesarias se enumeran a continuación.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Creando nuestra base de datos y tabla

Una vez que hayamos completado todos los pasos relacionados con la configuración, lo siguiente que debemos hacer es crear un conjunto de datos y una tabla en BigQuery. Hay varias formas de hacer esto, pero la más sencilla es usar la consola de Google Cloud creando primero un conjunto de datos. Puedes seguir los pasos a continuación. enlacepara crear una tabla con un esquema. Nuestra mesa tendrá 7 columnas, correspondiente a los componentes de cada registro de usuario. Por conveniencia, definiremos todas las columnas como cadenas, excepto la variable timelocal, y las nombraremos de acuerdo con las variables que generamos anteriormente. El diseño de nuestra tabla debería verse como en la Figura 3.

Creamos una canalización de procesamiento de datos de flujo. Parte 2
Figura 3. Diseño de la tabla

Publicar datos de registro de usuario

Pub/Sub es un componente crítico de nuestra canalización porque permite que múltiples aplicaciones independientes se comuniquen entre sí. En concreto, funciona como intermediario que nos permite enviar y recibir mensajes entre aplicaciones. Lo primero que debemos hacer es crear un tema. Simplemente vaya a Pub/Sub en la consola y haga clic en CREAR TEMA.

El siguiente código llama a nuestro script para generar los datos de registro definidos anteriormente y luego se conecta y envía los registros a Pub/Sub. Lo único que tenemos que hacer es crear un objeto. EditorCliente, especifique la ruta al tema usando el método topic_path y llamar a la función publish с topic_path y datos. Tenga en cuenta que importamos generate_log_line de nuestro guión stream_logs, así que asegúrese de que estos archivos estén en la misma carpeta; de lo contrario, obtendrá un error de importación. Luego podemos ejecutar esto a través de nuestra consola de Google usando:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Tan pronto como se ejecute el archivo, podremos ver la salida de los datos de registro a la consola, como se muestra en la siguiente figura. Este script funcionará mientras no usemos CTRL + Cpara completarlo

Creamos una canalización de procesamiento de datos de flujo. Parte 2
Figura 4. Salida publish_logs.py

Escribiendo nuestro código de canalización

Ahora que tenemos todo preparado, podemos comenzar la parte divertida: codificar nuestro canal usando Beam y Python. Para crear una tubería Beam, necesitamos crear un objeto de tubería (p). Una vez que hemos creado un objeto de tubería, podemos aplicar múltiples funciones una tras otra usando el operador pipe (|). En general, el flujo de trabajo se parece a la imagen siguiente.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

En nuestro código, crearemos dos funciones personalizadas. Función regex_clean, que escanea los datos y recupera la fila correspondiente según la lista PATRONES usando la función re.search. La función devuelve una cadena separada por comas. Si no eres un experto en expresiones regulares, te recomiendo que consultes esto. tutorial y practica en un bloc de notas para comprobar el código. Después de esto definimos una función ParDo personalizada llamada Mini Split, que es una variación de la transformada Beam para procesamiento paralelo. En Python, esto se hace de una manera especial: debemos crear una clase que herede de la clase DoFn Beam. La función Dividir toma la fila analizada de la función anterior y devuelve una lista de diccionarios con claves correspondientes a los nombres de las columnas en nuestra tabla de BigQuery. Hay algo a tener en cuenta sobre esta función: tuve que importar datetime dentro de una función para que funcione. Recibí un error de importación al principio del archivo, lo cual fue extraño. Esta lista luego se pasa a la función. Escribir en BigQuery, que simplemente agrega nuestros datos a la tabla. El código para el trabajo de flujo de datos por lotes y el trabajo de flujo de datos en streaming se proporciona a continuación. La única diferencia entre el código por lotes y el de streaming es que en el lote leemos el CSV de src_pathusando la función ReadFromText de Haz.

Trabajo de flujo de datos por lotes (procesamiento por lotes)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming de trabajo de flujo de datos (procesamiento de flujo)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Iniciando el transportador

Podemos ejecutar la tubería de varias maneras diferentes. Si quisiéramos, podríamos ejecutarlo localmente desde una terminal mientras iniciamos sesión en GCP de forma remota.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Sin embargo, lo ejecutaremos usando DataFlow. Podemos hacer esto usando el siguiente comando configurando los siguientes parámetros requeridos.

  • project - ID de su proyecto GCP.
  • runner es un corredor de canalización que analizará su programa y construirá su canalización. Para ejecutar en la nube, debe especificar un DataflowRunner.
  • staging_location — la ruta al almacenamiento en la nube de Cloud Dataflow para indexar los paquetes de código que necesitan los procesadores que realizan el trabajo.
  • temp_location — ruta al almacenamiento en la nube de Cloud Dataflow para almacenar archivos de trabajo temporales creados mientras se ejecuta la canalización.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Mientras se ejecuta este comando, podemos ir a la pestaña DataFlow en la consola de Google y ver nuestra canalización. Cuando hacemos clic en la canalización, deberíamos ver algo similar a la Figura 4. Para fines de depuración, puede ser muy útil ir a Registros y luego a Stackdriver para ver los registros detallados. Esto me ha ayudado a resolver problemas de canalización en varios casos.

Creamos una canalización de procesamiento de datos de flujo. Parte 2
Figura 4: Transportador de vigas

Accede a nuestros datos en BigQuery

Entonces, ya deberíamos tener una canalización en ejecución con datos que fluyen hacia nuestra tabla. Para probar esto, podemos ir a BigQuery y mirar los datos. Después de usar el siguiente comando, debería ver las primeras filas del conjunto de datos. Ahora que tenemos los datos almacenados en BigQuery, podemos realizar más análisis, así como compartir los datos con colegas y comenzar a responder preguntas comerciales.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Creamos una canalización de procesamiento de datos de flujo. Parte 2
Figura 5: Gran Consulta

Conclusión

Esperamos que esta publicación sirva como un ejemplo útil de cómo crear un canal de transmisión de datos, así como también para encontrar formas de hacer que los datos sean más accesibles. Almacenar datos en este formato nos aporta muchas ventajas. Ahora podemos empezar a responder preguntas importantes como ¿cuántas personas utilizan nuestro producto? ¿Su base de usuarios está creciendo con el tiempo? ¿Con qué aspectos del producto interactúa más la gente? ¿Y hay errores donde no debería haberlos? Estas son las preguntas que serán de interés para la organización. Según los conocimientos que surgen de las respuestas a estas preguntas, podemos mejorar el producto y aumentar la participación del usuario.

Beam es realmente útil para este tipo de ejercicio y también tiene otros casos de uso interesantes. Por ejemplo, es posible que desee analizar los datos de cotizaciones bursátiles en tiempo real y realizar operaciones basadas en el análisis, tal vez tenga datos de sensores provenientes de vehículos y desee realizar cálculos del nivel de tráfico. También podría ser, por ejemplo, una empresa de juegos que recopile datos de los usuarios y los utilice para crear paneles para realizar un seguimiento de métricas clave. Bien, caballeros, este es un tema para otra publicación, gracias por leer, y para aquellos que quieran ver el código completo, a continuación está el enlace a mi GitHub.

https://github.com/DFoly/User_log_pipeline

Eso es todo leer la primera parte.

Fuente: habr.com

Añadir un comentario