Ciao a tutti. Condividiamo la traduzione della parte finale dell'articolo, preparata appositamente per gli studenti del corso.
Apache Beam e DataFlow per pipeline in tempo reale
Configurazione di Google Cloud
Nota: ho utilizzato Google Cloud Shell per eseguire la pipeline e pubblicare dati di log personalizzati perché avevo problemi con l'esecuzione della pipeline in Python 3. Google Cloud Shell utilizza Python 2, che è più coerente con Apache Beam.
Per avviare la pipeline, dobbiamo scavare un po' nelle impostazioni. Quelli di voi che non hanno mai utilizzato GCP prima, dovranno seguire i seguenti 6 passaggi descritti in questo
Successivamente, dovremo caricare i nostri script su Google Cloud Storage e copiarli sul nostro Google Cloud Shel. Il caricamento nell'archivio cloud è piuttosto banale (è possibile trovare una descrizione
Figura 2
Di seguito sono elencati i comandi necessari per copiare i file e installare le librerie richieste.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Creazione del nostro database e tabella
Una volta completati tutti i passaggi relativi alla configurazione, la prossima cosa che dobbiamo fare è creare un set di dati e una tabella in BigQuery. Esistono diversi modi per farlo, ma il più semplice è utilizzare la console Google Cloud creando prima un set di dati. Puoi seguire i passaggi seguenti
Figura 3. Disposizione della tabella
Pubblicazione dei dati di registro utente
Pub/Sub è un componente fondamentale della nostra pipeline perché consente a più applicazioni indipendenti di comunicare tra loro. In particolare, funziona come un intermediario che ci permette di inviare e ricevere messaggi tra applicazioni. La prima cosa che dobbiamo fare è creare un argomento. Basta andare su Pub/Sub nella console e fare clic su CREA ARGOMENTO.
Il codice seguente richiama il nostro script per generare i dati di log definiti sopra, quindi si connette e invia i log a Pub/Sub. L'unica cosa che dobbiamo fare è creare un oggetto Cliente editore, specificare il percorso dell'argomento utilizzando il metodo topic_path
e chiamare la funzione publish
с topic_path
e dati. Tieni presente che importiamo generate_log_line
dalla nostra sceneggiatura stream_logs
, quindi assicurati che questi file siano nella stessa cartella, altrimenti riceverai un errore di importazione. Possiamo quindi eseguirlo tramite la nostra console Google utilizzando:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Non appena il file verrà eseguito, potremo vedere l'output dei dati di registro sulla console, come mostrato nella figura seguente. Questo script funzionerà finché non lo utilizzeremo CTRL + Cper completarlo.
Figura 4. Uscita publish_logs.py
Scrivere il codice della nostra pipeline
Ora che abbiamo tutto preparato, possiamo iniziare la parte divertente: codificare la nostra pipeline utilizzando Beam e Python. Per creare una pipeline Beam, dobbiamo creare un oggetto pipeline (p). Una volta creato un oggetto pipeline, possiamo applicare più funzioni una dopo l'altra utilizzando l'operatore pipe (|)
. In generale, il flusso di lavoro è simile all'immagine seguente.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Nel nostro codice creeremo due funzioni personalizzate. Funzione regex_clean
, che esegue la scansione dei dati e recupera la riga corrispondente in base all'elenco PATTERNS utilizzando la funzione re.search
. La funzione restituisce una stringa separata da virgole. Se non sei un esperto di espressioni regolari, ti consiglio di dare un'occhiata a questo datetime
all'interno di una funzione per farla funzionare. Ricevevo un errore di importazione all'inizio del file, il che era strano. Questo elenco viene quindi passato alla funzione WriteToBigQuery, che aggiunge semplicemente i nostri dati alla tabella. Di seguito è riportato il codice per il lavoro Batch DataFlow e il lavoro Streaming DataFlow. L'unica differenza tra codice batch e codice streaming è che in batch leggiamo il CSV src_path
utilizzando la funzione ReadFromText
da Fascio.
Lavoro DataFlow batch (elaborazione batch)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming processo DataFlow (elaborazione del flusso)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Avvio del trasportatore
Possiamo eseguire la pipeline in diversi modi. Se volessimo, potremmo semplicemente eseguirlo localmente da un terminale mentre accediamo a GCP da remoto.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Tuttavia, lo eseguiremo utilizzando DataFlow. Possiamo farlo usando il comando seguente impostando i seguenti parametri richiesti.
project
— ID del tuo progetto GCP.runner
è un runner di pipeline che analizzerà il tuo programma e costruirà la tua pipeline. Per eseguire nel cloud, è necessario specificare un DataflowRunner.staging_location
— il percorso verso l'archivio cloud Cloud Dataflow per l'indicizzazione dei pacchetti di codice necessari ai processori che eseguono il lavoro.temp_location
— percorso dell'archivio cloud Cloud Dataflow per l'archiviazione di file di lavoro temporanei creati durante l'esecuzione della pipeline.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Mentre questo comando è in esecuzione, possiamo andare alla scheda DataFlow nella console di Google e visualizzare la nostra pipeline. Quando facciamo clic sulla pipeline, dovremmo vedere qualcosa di simile alla Figura 4. Per scopi di debug, può essere molto utile andare su Logs e poi su Stackdriver per visualizzare i log dettagliati. Ciò mi ha aiutato a risolvere i problemi della pipeline in numerosi casi.
Figura 4: trasportatore di travi
Accedi ai nostri dati in BigQuery
Quindi, dovremmo già avere una pipeline in esecuzione con i dati che fluiscono nella nostra tabella. Per testarlo, possiamo andare su BigQuery ed esaminare i dati. Dopo aver utilizzato il comando seguente dovresti vedere le prime righe del set di dati. Ora che abbiamo i dati archiviati in BigQuery, possiamo condurre ulteriori analisi, nonché condividere i dati con i colleghi e iniziare a rispondere alle domande aziendali.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Figura 5: BigQuery
conclusione
Ci auguriamo che questo post serva come utile esempio di creazione di una pipeline di dati in streaming e di ricerca di modi per rendere i dati più accessibili. La memorizzazione dei dati in questo formato ci offre molti vantaggi. Ora possiamo iniziare a rispondere a domande importanti come: quante persone utilizzano il nostro prodotto? La tua base utenti cresce nel tempo? Con quali aspetti del prodotto interagiscono maggiormente le persone? E ci sono errori dove non dovrebbero esserci? Queste sono le domande che interesseranno l'organizzazione. Sulla base degli insight che emergono dalle risposte a queste domande, possiamo migliorare il prodotto e aumentare il coinvolgimento degli utenti.
Beam è davvero utile per questo tipo di esercizi e ha anche una serie di altri casi d'uso interessanti. Ad esempio, potresti voler analizzare i dati dei ticchettii azionari in tempo reale ed effettuare operazioni basate sull'analisi, forse disponi di dati di sensori provenienti dai veicoli e desideri calcolare i calcoli del livello di traffico. Potresti anche, ad esempio, essere un'azienda di giochi che raccoglie i dati degli utenti e li utilizza per creare dashboard per tenere traccia delle metriche chiave. Ok, signori, questo è un argomento per un altro post, grazie per la lettura e per chi vuole vedere il codice completo, di seguito c'è il collegamento al mio GitHub.
Questo è tutto.
Fonte: habr.com