Creiamo una pipeline di elaborazione dei dati in flusso. Parte 2

Ciao a tutti. Condividiamo la traduzione della parte finale dell'articolo, preparata appositamente per gli studenti del corso. Ingegnere dei dati. Puoi leggere la prima parte qui.

Apache Beam e DataFlow per pipeline in tempo reale

Creiamo una pipeline di elaborazione dei dati in flusso. Parte 2

Configurazione di Google Cloud

Nota: ho utilizzato Google Cloud Shell per eseguire la pipeline e pubblicare dati di log personalizzati perché avevo problemi con l'esecuzione della pipeline in Python 3. Google Cloud Shell utilizza Python 2, che è più coerente con Apache Beam.

Per avviare la pipeline, dobbiamo scavare un po' nelle impostazioni. Quelli di voi che non hanno mai utilizzato GCP prima, dovranno seguire i seguenti 6 passaggi descritti in questo pagina.

Successivamente, dovremo caricare i nostri script su Google Cloud Storage e copiarli sul nostro Google Cloud Shel. Il caricamento nell'archivio cloud è piuttosto banale (è possibile trovare una descrizione qui). Per copiare i nostri file, possiamo aprire Google Cloud Shel dalla barra degli strumenti facendo clic sulla prima icona a sinistra nella Figura 2 qui sotto.

Creiamo una pipeline di elaborazione dei dati in flusso. Parte 2
Figura 2

Di seguito sono elencati i comandi necessari per copiare i file e installare le librerie richieste.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

Creazione del nostro database e tabella

Una volta completati tutti i passaggi relativi alla configurazione, la prossima cosa che dobbiamo fare è creare un set di dati e una tabella in BigQuery. Esistono diversi modi per farlo, ma il più semplice è utilizzare la console Google Cloud creando prima un set di dati. Puoi seguire i passaggi seguenti collegamentoper creare una tabella con uno schema. Il nostro tavolo avrà 7 colonne, corrispondente ai componenti di ciascun registro utente. Per comodità, definiremo tutte le colonne come stringhe, ad eccezione della variabile timelocal, e le chiameremo in base alle variabili che abbiamo generato in precedenza. Il layout della nostra tabella dovrebbe apparire come nella Figura 3.

Creiamo una pipeline di elaborazione dei dati in flusso. Parte 2
Figura 3. Disposizione della tabella

Pubblicazione dei dati di registro utente

Pub/Sub è un componente fondamentale della nostra pipeline perché consente a più applicazioni indipendenti di comunicare tra loro. In particolare, funziona come un intermediario che ci permette di inviare e ricevere messaggi tra applicazioni. La prima cosa che dobbiamo fare è creare un argomento. Basta andare su Pub/Sub nella console e fare clic su CREA ARGOMENTO.

Il codice seguente richiama il nostro script per generare i dati di log definiti sopra, quindi si connette e invia i log a Pub/Sub. L'unica cosa che dobbiamo fare è creare un oggetto Cliente editore, specificare il percorso dell'argomento utilizzando il metodo topic_path e chiamare la funzione publish с topic_path e dati. Tieni presente che importiamo generate_log_line dalla nostra sceneggiatura stream_logs, quindi assicurati che questi file siano nella stessa cartella, altrimenti riceverai un errore di importazione. Possiamo quindi eseguirlo tramite la nostra console Google utilizzando:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

Non appena il file verrà eseguito, potremo vedere l'output dei dati di registro sulla console, come mostrato nella figura seguente. Questo script funzionerà finché non lo utilizzeremo CTRL + Cper completarlo.

Creiamo una pipeline di elaborazione dei dati in flusso. Parte 2
Figura 4. Uscita publish_logs.py

Scrivere il codice della nostra pipeline

Ora che abbiamo tutto preparato, possiamo iniziare la parte divertente: codificare la nostra pipeline utilizzando Beam e Python. Per creare una pipeline Beam, dobbiamo creare un oggetto pipeline (p). Una volta creato un oggetto pipeline, possiamo applicare più funzioni una dopo l'altra utilizzando l'operatore pipe (|). In generale, il flusso di lavoro è simile all'immagine seguente.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

Nel nostro codice creeremo due funzioni personalizzate. Funzione regex_clean, che esegue la scansione dei dati e recupera la riga corrispondente in base all'elenco PATTERNS utilizzando la funzione re.search. La funzione restituisce una stringa separata da virgole. Se non sei un esperto di espressioni regolari, ti consiglio di dare un'occhiata a questo tutorial ed esercitati su un blocco note per controllare il codice. Successivamente definiamo una funzione ParDo personalizzata chiamata Diviso, che è una variazione della trasformazione Beam per l'elaborazione parallela. In Python, questo viene fatto in un modo speciale: dobbiamo creare una classe che erediti dalla classe DoFn Beam. La funzione Dividi prende la riga analizzata dalla funzione precedente e restituisce un elenco di dizionari con chiavi corrispondenti ai nomi di colonna nella nostra tabella BigQuery. C'è qualcosa da notare su questa funzione: ho dovuto importare datetime all'interno di una funzione per farla funzionare. Ricevevo un errore di importazione all'inizio del file, il che era strano. Questo elenco viene quindi passato alla funzione WriteToBigQuery, che aggiunge semplicemente i nostri dati alla tabella. Di seguito è riportato il codice per il lavoro Batch DataFlow e il lavoro Streaming DataFlow. L'unica differenza tra codice batch e codice streaming è che in batch leggiamo il CSV src_pathutilizzando la funzione ReadFromText da Fascio.

Lavoro DataFlow batch (elaborazione batch)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Streaming processo DataFlow (elaborazione del flusso)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

Avvio del trasportatore

Possiamo eseguire la pipeline in diversi modi. Se volessimo, potremmo semplicemente eseguirlo localmente da un terminale mentre accediamo a GCP da remoto.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

Tuttavia, lo eseguiremo utilizzando DataFlow. Possiamo farlo usando il comando seguente impostando i seguenti parametri richiesti.

  • project — ID del tuo progetto GCP.
  • runner è un runner di pipeline che analizzerà il tuo programma e costruirà la tua pipeline. Per eseguire nel cloud, è necessario specificare un DataflowRunner.
  • staging_location — il percorso verso l'archivio cloud Cloud Dataflow per l'indicizzazione dei pacchetti di codice necessari ai processori che eseguono il lavoro.
  • temp_location — percorso dell'archivio cloud Cloud Dataflow per l'archiviazione di file di lavoro temporanei creati durante l'esecuzione della pipeline.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

Mentre questo comando è in esecuzione, possiamo andare alla scheda DataFlow nella console di Google e visualizzare la nostra pipeline. Quando facciamo clic sulla pipeline, dovremmo vedere qualcosa di simile alla Figura 4. Per scopi di debug, può essere molto utile andare su Logs e poi su Stackdriver per visualizzare i log dettagliati. Ciò mi ha aiutato a risolvere i problemi della pipeline in numerosi casi.

Creiamo una pipeline di elaborazione dei dati in flusso. Parte 2
Figura 4: trasportatore di travi

Accedi ai nostri dati in BigQuery

Quindi, dovremmo già avere una pipeline in esecuzione con i dati che fluiscono nella nostra tabella. Per testarlo, possiamo andare su BigQuery ed esaminare i dati. Dopo aver utilizzato il comando seguente dovresti vedere le prime righe del set di dati. Ora che abbiamo i dati archiviati in BigQuery, possiamo condurre ulteriori analisi, nonché condividere i dati con i colleghi e iniziare a rispondere alle domande aziendali.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

Creiamo una pipeline di elaborazione dei dati in flusso. Parte 2
Figura 5: BigQuery

conclusione

Ci auguriamo che questo post serva come utile esempio di creazione di una pipeline di dati in streaming e di ricerca di modi per rendere i dati più accessibili. La memorizzazione dei dati in questo formato ci offre molti vantaggi. Ora possiamo iniziare a rispondere a domande importanti come: quante persone utilizzano il nostro prodotto? La tua base utenti cresce nel tempo? Con quali aspetti del prodotto interagiscono maggiormente le persone? E ci sono errori dove non dovrebbero esserci? Queste sono le domande che interesseranno l'organizzazione. Sulla base degli insight che emergono dalle risposte a queste domande, possiamo migliorare il prodotto e aumentare il coinvolgimento degli utenti.

Beam è davvero utile per questo tipo di esercizi e ha anche una serie di altri casi d'uso interessanti. Ad esempio, potresti voler analizzare i dati dei ticchettii azionari in tempo reale ed effettuare operazioni basate sull'analisi, forse disponi di dati di sensori provenienti dai veicoli e desideri calcolare i calcoli del livello di traffico. Potresti anche, ad esempio, essere un'azienda di giochi che raccoglie i dati degli utenti e li utilizza per creare dashboard per tenere traccia delle metriche chiave. Ok, signori, questo è un argomento per un altro post, grazie per la lettura e per chi vuole vedere il codice completo, di seguito c'è il collegamento al mio GitHub.

https://github.com/DFoly/User_log_pipeline

Questo è tutto. Leggi la prima parte.

Fonte: habr.com

Aggiungi un commento