Hola a todos. Compartimos la traducción de la parte final del artículo, preparada específicamente para los alumnos del curso.
Apache Beam y DataFlow para canalizaciones en tiempo real
Configurar Google Cloud
Nota: Utilicé Google Cloud Shell para ejecutar la canalización y publicar datos de registro personalizados porque tenía problemas para ejecutar la canalización en Python 3. Google Cloud Shell usa Python 2, que es más coherente con Apache Beam.
Para iniciar el proceso, debemos profundizar un poco en la configuración. Para aquellos de ustedes que no han usado GCP antes, deberán seguir los siguientes 6 pasos descritos en este
Después de esto, necesitaremos cargar nuestros scripts en Google Cloud Storage y copiarlos a nuestro Google Cloud Shel. Cargar al almacenamiento en la nube es bastante trivial (se puede encontrar una descripción
Figura 2
Los comandos que necesitamos para copiar los archivos e instalar las bibliotecas necesarias se enumeran a continuación.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Creando nuestra base de datos y tabla
Una vez que hayamos completado todos los pasos relacionados con la configuración, lo siguiente que debemos hacer es crear un conjunto de datos y una tabla en BigQuery. Hay varias formas de hacer esto, pero la más sencilla es usar la consola de Google Cloud creando primero un conjunto de datos. Puedes seguir los pasos a continuación.
Figura 3. Diseño de la tabla
Publicar datos de registro de usuario
Pub/Sub es un componente crítico de nuestra canalización porque permite que múltiples aplicaciones independientes se comuniquen entre sí. En concreto, funciona como intermediario que nos permite enviar y recibir mensajes entre aplicaciones. Lo primero que debemos hacer es crear un tema. Simplemente vaya a Pub/Sub en la consola y haga clic en CREAR TEMA.
El siguiente código llama a nuestro script para generar los datos de registro definidos anteriormente y luego se conecta y envía los registros a Pub/Sub. Lo único que tenemos que hacer es crear un objeto. EditorCliente, especifique la ruta al tema usando el método topic_path
y llamar a la función publish
с topic_path
y datos. Tenga en cuenta que importamos generate_log_line
de nuestro guión stream_logs
, así que asegúrese de que estos archivos estén en la misma carpeta; de lo contrario, obtendrá un error de importación. Luego podemos ejecutar esto a través de nuestra consola de Google usando:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Tan pronto como se ejecute el archivo, podremos ver la salida de los datos de registro a la consola, como se muestra en la siguiente figura. Este script funcionará mientras no usemos CTRL + Cpara completarlo
Figura 4. Salida publish_logs.py
Escribiendo nuestro código de canalización
Ahora que tenemos todo preparado, podemos comenzar la parte divertida: codificar nuestro canal usando Beam y Python. Para crear una tubería Beam, necesitamos crear un objeto de tubería (p). Una vez que hemos creado un objeto de tubería, podemos aplicar múltiples funciones una tras otra usando el operador pipe (|)
. En general, el flujo de trabajo se parece a la imagen siguiente.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
En nuestro código, crearemos dos funciones personalizadas. Función regex_clean
, que escanea los datos y recupera la fila correspondiente según la lista PATRONES usando la función re.search
. La función devuelve una cadena separada por comas. Si no eres un experto en expresiones regulares, te recomiendo que consultes esto. datetime
dentro de una función para que funcione. Recibí un error de importación al principio del archivo, lo cual fue extraño. Esta lista luego se pasa a la función. Escribir en BigQuery, que simplemente agrega nuestros datos a la tabla. El código para el trabajo de flujo de datos por lotes y el trabajo de flujo de datos en streaming se proporciona a continuación. La única diferencia entre el código por lotes y el de streaming es que en el lote leemos el CSV de src_path
usando la función ReadFromText
de Haz.
Trabajo de flujo de datos por lotes (procesamiento por lotes)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming de trabajo de flujo de datos (procesamiento de flujo)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Iniciando el transportador
Podemos ejecutar la tubería de varias maneras diferentes. Si quisiéramos, podríamos ejecutarlo localmente desde una terminal mientras iniciamos sesión en GCP de forma remota.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Sin embargo, lo ejecutaremos usando DataFlow. Podemos hacer esto usando el siguiente comando configurando los siguientes parámetros requeridos.
project
- ID de su proyecto GCP.runner
es un corredor de canalización que analizará su programa y construirá su canalización. Para ejecutar en la nube, debe especificar un DataflowRunner.staging_location
— la ruta al almacenamiento en la nube de Cloud Dataflow para indexar los paquetes de código que necesitan los procesadores que realizan el trabajo.temp_location
— ruta al almacenamiento en la nube de Cloud Dataflow para almacenar archivos de trabajo temporales creados mientras se ejecuta la canalización.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Mientras se ejecuta este comando, podemos ir a la pestaña DataFlow en la consola de Google y ver nuestra canalización. Cuando hacemos clic en la canalización, deberíamos ver algo similar a la Figura 4. Para fines de depuración, puede ser muy útil ir a Registros y luego a Stackdriver para ver los registros detallados. Esto me ha ayudado a resolver problemas de canalización en varios casos.
Figura 4: Transportador de vigas
Accede a nuestros datos en BigQuery
Entonces, ya deberíamos tener una canalización en ejecución con datos que fluyen hacia nuestra tabla. Para probar esto, podemos ir a BigQuery y mirar los datos. Después de usar el siguiente comando, debería ver las primeras filas del conjunto de datos. Ahora que tenemos los datos almacenados en BigQuery, podemos realizar más análisis, así como compartir los datos con colegas y comenzar a responder preguntas comerciales.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Figura 5: Gran Consulta
Conclusión
Esperamos que esta publicación sirva como un ejemplo útil de cómo crear un canal de transmisión de datos, así como también para encontrar formas de hacer que los datos sean más accesibles. Almacenar datos en este formato nos aporta muchas ventajas. Ahora podemos empezar a responder preguntas importantes como ¿cuántas personas utilizan nuestro producto? ¿Su base de usuarios está creciendo con el tiempo? ¿Con qué aspectos del producto interactúa más la gente? ¿Y hay errores donde no debería haberlos? Estas son las preguntas que serán de interés para la organización. Según los conocimientos que surgen de las respuestas a estas preguntas, podemos mejorar el producto y aumentar la participación del usuario.
Beam es realmente útil para este tipo de ejercicio y también tiene otros casos de uso interesantes. Por ejemplo, es posible que desee analizar los datos de cotizaciones bursátiles en tiempo real y realizar operaciones basadas en el análisis, tal vez tenga datos de sensores provenientes de vehículos y desee realizar cálculos del nivel de tráfico. También podría ser, por ejemplo, una empresa de juegos que recopile datos de los usuarios y los utilice para crear paneles para realizar un seguimiento de métricas clave. Bien, caballeros, este es un tema para otra publicación, gracias por leer, y para aquellos que quieran ver el código completo, a continuación está el enlace a mi GitHub.
Eso es todo
Fuente: habr.com