Hola a tots. Compartim la traducció de la part final de l'article, elaborada específicament per als alumnes del curs.
Apache Beam i DataFlow per a canonades en temps real
Configuració de Google Cloud
Nota: he utilitzat Google Cloud Shell per executar la canalització i publicar dades de registre d'usuari perquè tenia problemes per executar la canalització a Python 3. Google Cloud Shell utilitza Python 2, que és més coherent amb Apache Beam.
Per iniciar el pipeline, hem d'aprofundir una mica en la configuració. Per a aquells de vosaltres que no heu utilitzat GCP abans, haureu de seguir els 6 passos següents que es descriuen en aquest
Després d'això, haurem de penjar els nostres scripts a Google Cloud Storage i copiar-los al nostre Google Cloud Shel. La càrrega a l'emmagatzematge al núvol és bastant trivial (es pot trobar una descripció
Figura 2
Les ordres que necessitem per copiar els fitxers i instal·lar les biblioteques necessàries es mostren a continuació.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Creació de la nostra base de dades i taula
Un cop hàgim completat tots els passos relacionats amb la configuració, el següent que hem de fer és crear un conjunt de dades i una taula a BigQuery. Hi ha diverses maneres de fer-ho, però la més senzilla és utilitzar la consola de Google Cloud creant primer un conjunt de dades. Podeu seguir els passos següents
Figura 3. Distribució de la taula
Publicació de dades de registre d'usuari
Pub/Sub és un component crític del nostre pipeline perquè permet que diverses aplicacions independents es comuniquin entre elles. En concret, funciona com a intermediari que ens permet enviar i rebre missatges entre aplicacions. El primer que hem de fer és crear un tema. Simplement aneu a Pub/Sub a la consola i feu clic a CREAR TEMA.
El codi següent crida al nostre script per generar les dades de registre definides anteriorment i després connecta i envia els registres a Pub/Sub. L'únic que hem de fer és crear un objecte PublisherClient, especifiqueu el camí al tema mitjançant el mètode topic_path
i crida a la funció publish
с topic_path
i dades. Tingueu en compte que importem generate_log_line
del nostre guió stream_logs
, així que assegureu-vos que aquests fitxers estiguin a la mateixa carpeta, en cas contrari obtindreu un error d'importació. Aleshores podem executar-ho a través de la nostra consola de Google fent servir:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Tan bon punt s'executi el fitxer, podrem veure la sortida de les dades de registre a la consola, tal com es mostra a la figura següent. Aquest script funcionarà mentre no l'utilitzem CTRL + Cper completar-lo.
Figura 4. Sortida publish_logs.py
Escrivint el nostre codi de pipeline
Ara que ho tenim tot preparat, podem començar la part divertida: codificar la nostra canalització amb Beam i Python. Per crear una canalització Beam, hem de crear un objecte canalització (p). Un cop hem creat un objecte pipeline, podem aplicar múltiples funcions una darrere l'altra mitjançant l'operador pipe (|)
. En general, el flux de treball s'assembla a la imatge següent.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Al nostre codi, crearem dues funcions personalitzades. Funció regex_clean
, que escaneja les dades i recupera la fila corresponent en funció de la llista PATRONS mitjançant la funció re.search
. La funció retorna una cadena separada per comes. Si no sou un expert en expressió regular, us recomano que feu una ullada a això datetime
dins d'una funció perquè funcioni. Vaig rebre un error d'importació al principi del fitxer, que era estrany. A continuació, aquesta llista es passa a la funció WriteToBigQuery, que simplement afegeix les nostres dades a la taula. A continuació es mostra el codi del treball de flux de dades per lots i del treball de flux de dades en continu. L'única diferència entre el codi per lots i el de streaming és que en batch llegim el CSV src_path
utilitzant la funció ReadFromText
de Beam.
Treball de flux de dades per lots (processament per lots)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Treball de flux de dades en temps real (processament de flux)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Execució de la canonada
Podem executar el pipeline de diverses maneres diferents. Si volguéssim, podríem executar-lo localment des d'un terminal mentre iniciem sessió a GCP de forma remota.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Tanmateix, l'executarem amb DataFlow. Podem fer-ho mitjançant l'ordre següent establint els següents paràmetres necessaris.
project
— ID del vostre projecte GCP.runner
és un corredor de pipelines que analitzarà el vostre programa i construirà el vostre pipeline. Per executar-se al núvol, heu d'especificar un DataflowRunner.staging_location
— el camí a l'emmagatzematge en núvol de Cloud Dataflow per indexar els paquets de codi necessaris pels processadors que realitzen el treball.temp_location
— camí a l'emmagatzematge en núvol de Cloud Dataflow per emmagatzemar fitxers de treball temporals creats mentre s'executa la canalització.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Mentre s'executa aquesta ordre, podem anar a la pestanya DataFlow de la consola de Google i veure el nostre pipeline. Quan fem clic a la canalització, hauríem de veure alguna cosa semblant a la figura 4. Per a finalitats de depuració, pot ser molt útil anar a Registres i després a Stackdriver per veure els registres detallats. Això m'ha ajudat a resoldre problemes de canalització en diversos casos.
Figura 4: Transportador de biga
Accediu a les nostres dades a BigQuery
Per tant, ja hauríem de tenir un pipeline en funcionament amb dades que flueixin a la nostra taula. Per provar-ho, podem anar a BigQuery i mirar les dades. Després d'utilitzar l'ordre següent, hauríeu de veure les primeres files del conjunt de dades. Ara que tenim les dades emmagatzemades a BigQuery, podem fer més anàlisis, així com compartir-les amb els companys i començar a respondre preguntes empresarials.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Figura 5: BigQuery
Conclusió
Esperem que aquesta publicació serveixi d'exemple útil per crear una canalització de dades en streaming, així com per trobar maneres de fer que les dades siguin més accessibles. Emmagatzemar dades en aquest format ens aporta molts avantatges. Ara podem començar a respondre preguntes importants com ara quantes persones fan servir el nostre producte? La vostra base d'usuaris està creixent amb el temps? Amb quins aspectes del producte interactuen més les persones? I hi ha errors on no n'hi hauria d'haver? Aquestes són les preguntes que seran d'interès per a l'organització. A partir dels coneixements que sorgeixen de les respostes a aquestes preguntes, podem millorar el producte i augmentar la implicació dels usuaris.
Beam és realment útil per a aquest tipus d'exercicis i també té una sèrie d'altres casos d'ús interessants. Per exemple, és possible que vulgueu analitzar les dades d'existències en temps real i fer operacions basades en l'anàlisi, potser teniu dades de sensors procedents de vehicles i voleu calcular els càlculs del nivell de trànsit. També podríeu ser, per exemple, una empresa de jocs que recopila dades dels usuaris i les utilitza per crear taulers de control per fer un seguiment de mètriques clau. D'acord, senyors, aquest és un tema per a una altra publicació, gràcies per llegir-lo, i per a aquells que vulguin veure el codi complet, a continuació teniu l'enllaç al meu GitHub.
Això és tot.
Font: www.habr.com