Kaixo guztioi. Artikuluaren azken zatiaren itzulpena partekatzen ari gara, ikastaroko ikasleentzat espresuki prestatua.
Apache Beam eta DataFlow denbora errealeko kanalizazioetarako
Google Cloud konfiguratzen
Oharra: Google Cloud Shell erabili dut kanalizazioa exekutatzeko eta erregistro pertsonalizatuko datuak argitaratzeko, arazoak izan nituelako kanalizazioa Python 3-n exekutatzeko. Google Cloud Shell-ek Python 2 erabiltzen du, Apache Beam-ekin koherenteagoa dena.
Hodibidea hasteko, ezarpenetan pixka bat sakondu behar dugu. Aurretik GCP erabili ez duzuenontzat, honako 6 urrats hauek jarraitu beharko dituzue.
Horren ondoren, gure scriptak Google Cloud Storage-ra igo beharko ditugu eta gure Google Cloud Shel-era kopiatu beharko ditugu. Hodeiko biltegiratzera igotzea nahiko hutsala da (deskribapen bat aurki daiteke
2 irudia
Fitxategiak kopiatzeko eta beharrezko liburutegiak instalatzeko behar ditugun komandoak behean ageri dira.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Gure datu-basea eta taula sortzea
Behin konfigurazioarekin lotutako urrats guztiak amaituta, egin behar dugun hurrengo gauza BigQuery-n datu-multzo eta taula bat sortzea da. Horretarako hainbat modu daude, baina errazena Google Cloud kontsola erabiltzea da lehenik datu multzo bat sortuz. Beheko urratsak jarraitu ditzakezu
3. Irudia Taularen diseinua
Erabiltzaileen erregistro-datuak argitaratzea
Pub/Sub gure kanalizazioaren osagai kritikoa da, hainbat aplikazio independente elkarren artean komunikatzeko aukera ematen duelako. Bereziki, bitartekari gisa funtzionatzen du, aplikazioen artean mezuak bidali eta jasotzeko aukera ematen diguna. Egin behar dugun lehenengo gauza gai bat sortzea da. Besterik gabe, joan kontsolako Pub/Sub-era eta egin klik SORTU GAIA.
Beheko kodeak gure script-a deitzen du goian definitutako erregistro-datuak sortzeko eta gero erregistroak konektatu eta bidaltzen ditu Pub/Sub-era. Egin behar dugun gauza bakarra objektu bat sortzea da PublisherClient, zehaztu gaiaren bidea metodoa erabiliz topic_path
eta deitu funtzioari publish
Ρ topic_path
eta datuak. Kontuan izan inportatzen dugula generate_log_line
gure gidoitik stream_logs
, beraz, ziurtatu fitxategi hauek karpeta berean daudela, bestela inportazio-errore bat jasoko duzu. Ondoren, gure google kontsolaren bidez exekutatu dezakegu:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Fitxategia exekutatu bezain laster, erregistroko datuen irteera kontsolara ikusi ahal izango dugu, beheko irudian ikusten den moduan. Script honek funtzionatuko du erabiltzen ez dugun bitartean CTRL + Cosatzeko.
4. irudia. Irteera publish_logs.py
Gure kanalizazioaren kodea idazten
Dena prestatuta daukagunez, zati dibertigarriari ekin diezaiokegu: Beam eta Python erabiliz gure kanalizazioa kodetzen. Beam kanalizazioa sortzeko, kanalizazio objektu bat sortu behar dugu (p). Pipeline objektu bat sortu ondoren, hainbat funtzio aplika ditzakegu bata bestearen atzetik operadorea erabiliz pipe (|)
. Oro har, lan-fluxuak beheko irudiaren itxura du.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Gure kodean, bi funtzio pertsonalizatu sortuko ditugu. Funtzioa regex_clean
, datuak eskaneatzen dituena eta dagokion errenkada berreskuratzen duena, PATTERNS zerrendan oinarrituta funtzioa erabiliz re.search
. Funtzioak komaz bereizitako kate bat itzultzen du. Adierazpen erregular aditua ez bazara, hau egiaztatzea gomendatzen dizut datetime
funtzio baten barruan funtziona dezan. Fitxategiaren hasieran inportazio-errore bat jasotzen ari nintzen, arraroa. Zerrenda hau funtziora pasatzen da WriteToBigQuery, gure datuak taulara gehitzen dituena. Batch DataFlow Job eta Streaming DataFlow Lanaren kodea behean ematen da. Batch eta streaming kodearen arteko desberdintasun bakarra batch-ean CSV irakurtzen dugula da src_path
funtzioa erabiliz ReadFromText
Beam-etik.
Batch DataFlow lana (batch prozesatzea)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow lana (korronteen prozesatzea)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Zinta garraiatzailea abiaraztea
Hainbat modutan exekutatu dezakegu kanalizazioa. Nahi izanez gero, lokalean exekutatu genezake terminal batetik GCPn urrunetik saioa hasten dugun bitartean.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Hala ere, DataFlow erabiliz exekutatuko dugu. Hau egin dezakegu beheko komandoa erabiliz eskatzen diren parametro hauek ezarriz.
project
β Zure GCP proiektuaren IDa.runner
zure programa aztertu eta zure kanalizazioa eraikiko duen kanalizazioa da. Hodeian exekutatzeko, DataflowRunner bat zehaztu behar duzu.staging_location
β lana egiten duten prozesadoreek behar dituzten kode paketeak indexatzeko Cloud Dataflow hodeiko biltegirako bidea.temp_location
β Hodeiko Dataflow hodeiko biltegirako bidea kanalizazioa martxan dagoen bitartean sortutako aldi baterako lan-fitxategiak gordetzeko.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Komando hau exekutatzen ari den bitartean, google kontsolako DataFlow fitxara joan eta gure kanalizazioa ikus dezakegu. Hodietan klik egiten dugunean, 4. irudiaren antzeko zerbait ikusi beharko genuke. Arazketa-helburuetarako, oso lagungarria izan daiteke Logs-era joatea eta gero Stackdriver-era erregistro zehatzak ikusteko. Honek kanalizazio-arazoak konpontzen lagundu dit hainbat kasutan.
4. Irudia: Habe-garraiatzailea
Sartu gure datuak BigQuery-n
Beraz, dagoeneko kanalizazio bat izan beharko genuke gure taulara sartzen diren datuekin. Hau probatzeko, BigQuery-ra joan eta datuak begiratu ditzakegu. Beheko komandoa erabili ondoren datu-multzoaren lehen errenkadak ikusi beharko dituzu. Datuak BigQuery-n gordeta dauzkagunean, azterketa gehiago egin ditzakegu, baita datuak lankideekin partekatu eta negozioaren galderei erantzuten hasi.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
5. irudia: BigQuery
Ondorioa
Espero dugu argitalpen hau zuzeneko datuen kanalizazioa sortzeko adibide erabilgarria izatea, baita datuak eskuragarriago bihurtzeko moduak aurkitzeko ere. Datuak formatu honetan gordetzeak abantaila asko ematen dizkigu. Orain has gaitezke galdera garrantzitsuei erantzuten, esate baterako, zenbat pertsonek erabiltzen dute gure produktua? Zure erabiltzaile-basea hazten al da denborarekin? Produktuaren zein alderdirekin elkarreragiten du jendeak gehien? Eta egon behar ez den lekuan akatsak daude? Hauek dira erakundearentzat interesgarriak izango diren galderak. Galdera hauen erantzunetatik ateratzen diren ikuspegietan oinarrituta, produktua hobetu eta erabiltzaileen konpromisoa areagotu dezakegu.
Beam oso erabilgarria da ariketa mota honetarako eta beste erabilera kasu interesgarri batzuk ere baditu. Esate baterako, baliteke stock tick datuak denbora errealean aztertu eta analisian oinarritutako merkataritzak egin nahi izatea, agian ibilgailuetatik datozen sentsore datuak dituzu eta trafiko-mailaren kalkuluak kalkulatu nahi dituzu. Gainera, adibidez, erabiltzaileen datuak biltzen dituen joko-enpresa bat izan zaitezke eta aginte-panelak sortzeko erabiltzen dituen neurketa gakoen jarraipena egiteko. Ados, jaunak, hau beste mezu baterako gaia da, eskerrik asko irakurtzeagatik, eta kode osoa ikusi nahi dutenentzat, behean dago nire GitHub-erako esteka.
Hori da dena.
Iturria: www.habr.com