Criamos um pipeline de processamento de dados de fluxo. Parte 2

Olá a todos. Estamos compartilhando a tradução da parte final do artigo, preparada especificamente para os alunos do curso. Engenheiro de dados. Você pode ler a primeira parte aqui.

Apache Beam e DataFlow para pipelines em tempo real

Criamos um pipeline de processamento de dados de fluxo. Parte 2

Configurando o Google Cloud

Observação: usei o Google Cloud Shell para executar o pipeline e publicar dados de log personalizados porque estava tendo problemas para executar o pipeline no Python 3. O Google Cloud Shell usa Python 2, que é mais consistente com o Apache Beam.

Para iniciar o pipeline, precisamos nos aprofundar um pouco nas configurações. Para aqueles que nunca usaram o GCP antes, você precisará seguir as seis etapas a seguir descritas neste página.

Depois disso, precisaremos fazer upload de nossos scripts para o Google Cloud Storage e copiá-los para nosso Google Cloud Shel. O upload para armazenamento em nuvem é bastante trivial (uma descrição pode ser encontrada aqui). Para copiar nossos arquivos, podemos abrir o Google Cloud Shel na barra de ferramentas clicando no primeiro ícone à esquerda na Figura 2 abaixo.

Criamos um pipeline de processamento de dados de fluxo. Parte 2
Figura 2

Os comandos que precisamos para copiar os arquivos e instalar as bibliotecas necessárias estão listados abaixo.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Criando nosso banco de dados e tabela

Depois de concluir todas as etapas relacionadas à configuração, a próxima coisa que precisamos fazer é criar um conjunto de dados e uma tabela no BigQuery. Existem várias maneiras de fazer isso, mas a mais simples é usar o console do Google Cloud criando primeiro um conjunto de dados. Você pode seguir as etapas abaixo linkpara criar uma tabela com um esquema. Nossa mesa terá 7 colunas, correspondente aos componentes de cada log de usuário. Por conveniência, definiremos todas as colunas como strings, exceto a variável timelocal, e as nomearemos de acordo com as variáveis ​​que geramos anteriormente. O layout da nossa tabela deve ser semelhante ao da Figura 3.

Criamos um pipeline de processamento de dados de fluxo. Parte 2
Figura 3. Layout da tabela

Publicando dados de log do usuário

O Pub/Sub é um componente crítico do nosso pipeline porque permite que vários aplicativos independentes se comuniquem entre si. Em particular, funciona como um intermediário que nos permite enviar e receber mensagens entre aplicações. A primeira coisa que precisamos fazer é criar um tópico. Basta acessar o Pub/Sub no console e clicar em CRIAR TÓPICO.

O código abaixo chama nosso script para gerar os dados de log definidos acima e, em seguida, conecta e envia os logs ao Pub/Sub. A única coisa que precisamos fazer é criar um objeto Cliente Editor, especifique o caminho para o tópico usando o método topic_path e chame a função publish с topic_path e dados. Observe que importamos generate_log_line do nosso roteiro stream_logs, portanto, certifique-se de que esses arquivos estejam na mesma pasta, caso contrário, você receberá um erro de importação. Podemos então executar isso através do nosso console do Google usando:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Assim que o arquivo for executado, poderemos ver a saída dos dados de log para o console, conforme mostrado na figura abaixo. Este script funcionará enquanto não usarmos CTRL + Cpara completá-lo.

Criamos um pipeline de processamento de dados de fluxo. Parte 2
Figura 4. Saída publish_logs.py

Escrevendo nosso código de pipeline

Agora que temos tudo preparado, podemos começar a parte divertida: codificar nosso pipeline usando Beam e Python. Para criar um pipeline do Beam, precisamos criar um objeto de pipeline (p). Depois de criarmos um objeto pipeline, podemos aplicar várias funções, uma após a outra, usando o operador pipe (|). Em geral, o fluxo de trabalho se parece com a imagem abaixo.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Em nosso código, criaremos duas funções customizadas. Função regex_clean, que verifica os dados e recupera a linha correspondente com base na lista PATTERNS usando a função re.search. A função retorna uma string separada por vírgula. Se você não é um especialista em expressões regulares, recomendo verificar isto tutorial e pratique em um bloco de notas para verificar o código. Depois disso, definimos uma função ParDo personalizada chamada Split, que é uma variação da transformação Beam para processamento paralelo. Em Python, isso é feito de uma maneira especial - devemos criar uma classe que herde da classe DoFn Beam. A função Split pega a linha analisada da função anterior e retorna uma lista de dicionários com chaves correspondentes aos nomes das colunas em nossa tabela do BigQuery. Há algo a ser observado sobre esta função: tive que importar datetime dentro de uma função para fazê-la funcionar. Eu estava recebendo um erro de importação no início do arquivo, o que era estranho. Esta lista é então passada para a função WriteToBigQuery, que simplesmente adiciona nossos dados à tabela. O código para Batch DataFlow Job e Streaming DataFlow Job é fornecido abaixo. A única diferença entre o código em lote e o código de streaming é que no lote lemos o CSV de src_pathusando a função ReadFromText da viga.

Tarefa DataFlow em lote (processamento em lote)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Job de streaming do DataFlow (processamento de stream)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Iniciando o transportador

Podemos executar o pipeline de várias maneiras diferentes. Se quiséssemos, poderíamos simplesmente executá-lo localmente a partir de um terminal enquanto efetuamos login no GCP remotamente.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

No entanto, vamos executá-lo usando DataFlow. Podemos fazer isso usando o comando abaixo, definindo os seguintes parâmetros necessários.

  • project — ID do seu projeto GCP.
  • runner é um executor de pipeline que analisará seu programa e construirá seu pipeline. Para executar na nuvem, você deve especificar um DataflowRunner.
  • staging_location — o caminho para o armazenamento em nuvem do Cloud Dataflow para indexar pacotes de códigos necessários aos processadores que executam o trabalho.
  • temp_location — caminho para o armazenamento em nuvem do Cloud Dataflow para armazenar arquivos de trabalho temporários criados enquanto o pipeline está em execução.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Enquanto este comando está em execução, podemos ir para a guia DataFlow no console do Google e visualizar nosso pipeline. Ao clicar no pipeline, devemos ver algo semelhante à Figura 4. Para fins de depuração, pode ser muito útil ir para Logs e depois para Stackdriver para visualizar os logs detalhados. Isso me ajudou a resolver problemas de pipeline em vários casos.

Criamos um pipeline de processamento de dados de fluxo. Parte 2
Figura 4: Transportador de viga

Acesse nossos dados no BigQuery

Portanto, já devemos ter um pipeline em execução com dados fluindo para nossa tabela. Para testar isso, podemos acessar o BigQuery e observar os dados. Depois de usar o comando abaixo, você deverá ver as primeiras linhas do conjunto de dados. Agora que temos os dados armazenados no BigQuery, podemos realizar análises mais aprofundadas, bem como compartilhar os dados com colegas e começar a responder perguntas de negócios.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Criamos um pipeline de processamento de dados de fluxo. Parte 2
Figura 5: BigQuery

Conclusão

Esperamos que esta postagem sirva como um exemplo útil de criação de um pipeline de streaming de dados, bem como de como encontrar maneiras de tornar os dados mais acessíveis. Armazenar dados neste formato nos oferece muitas vantagens. Agora podemos começar a responder questões importantes como quantas pessoas usam nosso produto? Sua base de usuários está crescendo com o tempo? Com quais aspectos do produto as pessoas interagem mais? E existem erros onde não deveriam existir? Estas são as questões que serão de interesse da organização. Com base nos insights que surgem das respostas a essas perguntas, podemos melhorar o produto e aumentar o envolvimento do usuário.

O Beam é realmente útil para esse tipo de exercício e também possui vários outros casos de uso interessantes. Por exemplo, você pode querer analisar dados de stock ticks em tempo real e fazer negociações com base na análise, talvez você tenha dados de sensores vindos de veículos e queira fazer cálculos de nível de tráfego. Você também pode, por exemplo, ser uma empresa de jogos que coleta dados do usuário e os utiliza para criar painéis para rastrear as principais métricas. Ok, senhores, isso é assunto para outro post, obrigado pela leitura, e para quem quiser ver o código completo, abaixo está o link para meu GitHub.

https://github.com/DFoly/User_log_pipeline

Isso é tudo. Leia a primeira parte.

Fonte: habr.com

Adicionar um comentário