Creamos unha canalización de procesamento de datos. Parte 2

Ola a todos. Compartimos a tradución da parte final do artigo, elaborada especificamente para o alumnado do curso. Enxeñeiro de datos. Podes ler a primeira parte aquí.

Apache Beam e DataFlow para canalizacións en tempo real

Creamos unha canalización de procesamento de datos. Parte 2

Configurando Google Cloud

Nota: usei Google Cloud Shell para executar a canalización e publicar datos de rexistro personalizados porque estaba a ter problemas para executar a canalización en Python 3. Google Cloud Shell usa Python 2, que é máis consistente con Apache Beam.

Para comezar a canalización, necesitamos afondar un pouco na configuración. Para aqueles de vostedes que non usaron GCP antes, terán que seguir os seguintes 6 pasos descritos neste páxina.

Despois diso, teremos que cargar os nosos scripts en Google Cloud Storage e copialos no noso Google Cloud Shel. A carga no almacenamento na nube é bastante trivial (pódese atopar unha descrición aquí). Para copiar os nosos ficheiros, podemos abrir Google Cloud Shel desde a barra de ferramentas facendo clic na primeira icona da esquerda na Figura 2 a continuación.

Creamos unha canalización de procesamento de datos. Parte 2
Imaxe 2

Os comandos que necesitamos para copiar os ficheiros e instalar as bibliotecas necesarias están listados a continuación.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Creando a nosa base de datos e táboa

Unha vez que completamos todos os pasos relacionados coa configuración, o seguinte que debemos facer é crear un conxunto de datos e unha táboa en BigQuery. Hai varias formas de facelo, pero o máis sinxelo é usar a consola de Google Cloud creando primeiro un conxunto de datos. Podes seguir os pasos a continuación Ligazónpara crear unha táboa cun esquema. A nosa mesa terá 7 columnas, correspondentes aos compoñentes de cada rexistro de usuario. Por comodidade, definiremos todas as columnas como cadeas, excepto a variable timelocal, e nomearémolas segundo as variables que xeramos anteriormente. A disposición da nosa táboa debería parecer na Figura 3.

Creamos unha canalización de procesamento de datos. Parte 2
Figura 3. Disposición da táboa

Publicación de datos de rexistro de usuarios

Pub/Sub é un compoñente fundamental do noso pipeline porque permite que varias aplicacións independentes se comuniquen entre si. En concreto, funciona como intermediario que nos permite enviar e recibir mensaxes entre aplicacións. O primeiro que debemos facer é crear un tema. Simplemente vai a Pub/Sub na consola e fai clic en CREAR TEMA.

O seguinte código chama ao noso script para xerar os datos de rexistro definidos anteriormente e, a continuación, conecta e envía os rexistros a Pub/Sub. O único que temos que facer é crear un obxecto PublisherClient, especifique o camiño ao tema mediante o método topic_path e chamar á función publish с topic_path e datos. Teña en conta que importamos generate_log_line do noso guión stream_logs, así que asegúrese de que estes ficheiros estean no mesmo cartafol, se non, recibirá un erro de importación. Despois podemos executalo a través da nosa consola de Google usando:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

En canto se execute o ficheiro, poderemos ver a saída dos datos de rexistro para a consola, como se mostra na figura a continuación. Este script funcionará mentres non o usemos CTRL + Cpara completalo.

Creamos unha canalización de procesamento de datos. Parte 2
Figura 4. Saída publish_logs.py

Escribindo o noso código de pipeline

Agora que temos todo preparado, podemos comezar a parte divertida: codificar o noso pipeline usando Beam e Python. Para crear unha canalización Beam, necesitamos crear un obxecto canalización (p). Unha vez que creamos un obxecto pipeline, podemos aplicar varias funcións unha tras outra usando o operador pipe (|). En xeral, o fluxo de traballo parece a imaxe de abaixo.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

No noso código, crearemos dúas funcións personalizadas. Función regex_clean, que escanea os datos e recupera a fila correspondente en función da lista PATRÓNS mediante a función re.search. A función devolve unha cadea separada por comas. Se non es un experto en expresións regulares, recoméndoche consultar isto titorial e practica nun bloc de notas para comprobar o código. Despois diso definimos unha función personalizada de ParDo chamada división, que é unha variación da transformada Beam para procesamento paralelo. En Python, isto faise dun xeito especial: debemos crear unha clase que herde da clase DoFn Beam. A función Dividir toma a fila analizada da función anterior e devolve unha lista de dicionarios coas claves correspondentes aos nomes das columnas da nosa táboa de BigQuery. Hai algo que destacar sobre esta función: tiven que importar datetime dentro dunha función para que funcione. Recibía un erro de importación ao comezo do ficheiro, que era raro. Esta lista pásase entón á función WriteToBigQuery, que simplemente engade os nosos datos á táboa. O código para o traballo de fluxo de datos por lotes e o traballo de fluxo de datos en tempo real é o seguinte. A única diferenza entre o código de lote e o de streaming é que en lote lemos o CSV desde src_pathutilizando a función ReadFromText de Beam.

Traballo de fluxo de datos por lotes (procesamento por lotes)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming DataFlow Job (procesamento de fluxos)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Arranque do transportador

Podemos executar o gasoduto de varias formas diferentes. Se quixeramos, poderiamos executalo localmente desde un terminal mentres iniciamos sesión en GCP de forma remota.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Non obstante, imos executalo usando DataFlow. Podemos facelo usando o seguinte comando configurando os seguintes parámetros necesarios.

  • project — ID do teu proxecto GCP.
  • runner é un corredor de pipeline que analizará o teu programa e construirá o teu pipeline. Para executarse na nube, debes especificar un DataflowRunner.
  • staging_location — o camiño ao almacenamento na nube de Cloud Dataflow para indexar os paquetes de código necesarios polos procesadores que realizan o traballo.
  • temp_location — ruta ao almacenamento na nube de Cloud Dataflow para almacenar ficheiros de traballo temporais creados mentres a canalización está en execución.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Mentres se executa este comando, podemos ir á pestana DataFlow da consola de Google e ver o noso pipeline. Cando facemos clic na canalización, deberíamos ver algo semellante á Figura 4. Para fins de depuración, pode ser moi útil ir a Rexistros e despois a Stackdriver para ver os rexistros detallados. Isto axudoume a resolver problemas de canalización en varios casos.

Creamos unha canalización de procesamento de datos. Parte 2
Figura 4: Transportador de vigas

Accede aos nosos datos en BigQuery

Polo tanto, xa deberíamos ter unha canalización en execución con datos que flúen na nosa táboa. Para probar isto, podemos ir a BigQuery e mirar os datos. Despois de usar o seguinte comando, deberías ver as primeiras filas do conxunto de datos. Agora que temos os datos almacenados en BigQuery, podemos realizar máis análises, así como compartir os datos cos compañeiros e comezar a responder preguntas empresariais.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Creamos unha canalización de procesamento de datos. Parte 2
Figura 5: BigQuery

Conclusión

Agardamos que esta publicación sirva de exemplo útil para crear unha canalización de datos en streaming, así como para atopar formas de facer que os datos sexan máis accesibles. Almacenar datos neste formato ofrécenos moitas vantaxes. Agora podemos comezar a responder preguntas importantes como cantas persoas están a usar o noso produto? A túa base de usuarios está crecendo co paso do tempo? Con que aspectos do produto interactúan máis as persoas? E hai erros onde non debería haber? Estas son as preguntas que serán de interese para a organización. A partir das ideas que se desprenden das respostas a estas preguntas, podemos mellorar o produto e aumentar a implicación dos usuarios.

Beam é realmente útil para este tipo de exercicios e tamén ten outros casos de uso interesantes. Por exemplo, pode querer analizar os datos das accións en tempo real e facer transaccións en función da análise, quizais teña datos de sensores procedentes de vehículos e queira calcular os cálculos do nivel de tráfico. Tamén podes, por exemplo, ser unha empresa de xogos que recompila datos dos usuarios e utilízaos para crear paneis de control para rastrexar as principais métricas. Vale, señores, este é un tema para outra publicación, grazas por ler, e para aqueles que queiran ver o código completo, a continuación está a ligazón ao meu GitHub.

https://github.com/DFoly/User_log_pipeline

Isto é todo. Ler a primeira parte.

Fonte: www.habr.com

Engadir un comentario