Talofa uma. O loʻo matou faʻasoaina le faʻaliliuga o le vaega mulimuli o le tusiga, saunia faapitoa mo tamaiti aoga o le vasega.
Apache Beam ma DataFlow mo Pipeline Taimi Moni
Fa'atulaga Google Cloud
Faʻaaliga: Na ou faʻaogaina le Google Cloud Shell e faʻatautaia ai le paipa ma faʻasalalau faʻamaumauga o faʻamaumauga masani ona o loʻu faʻafitauli i le faʻaogaina o le paipa i le Python 3. Google Cloud Shell faʻaaogaina le Python 2, lea e sili atu ona ogatasi ma Apache Beam.
Ina ia amata le paipa, e tatau ona tatou eli teisi i totonu o tulaga. Mo outou e le'i fa'aogaina muamua le GCP, e tatau ona e mulimuli i la'asaga nei e 6 o lo'o fa'amatala atu i lenei mea
A maeʻa lenei mea, e manaʻomia le tuʻuina atu o a tatou tusitusiga ile Google Cloud Storage ma kopi i la tatou Google Cloud Shel. O le tuʻuina atu i luga o le teuina o ao e matua le taua (e mafai ona maua se faʻamatalaga
2 Ata
O poloaiga tatou te manaʻomia e kopi ai faila ma faʻapipiʻi faletusi manaʻomia o loʻo lisiina i lalo.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Fausia la matou fa'amaumauga ma le laulau
A mae'a uma la'asaga feso'ota'i, o le isi mea e tatau ona tatou faia o le faia lea o se fa'amaumauga ma le laulau ile BigQuery. E tele auala e fai ai lenei mea, ae o le faigofie o le faʻaogaina o le Google Cloud console e ala i le faia muamua o se faʻamaumauga. E mafai ona e mulimuli i laasaga o loʻo i lalo
Ata 3. Fa'atulagaina o laulau
Fa'asalalau fa'amaumauga o fa'amaumauga a tagata fa'aoga
Pub/Sub ose vaega taua o la tatou paipa aua e mafai ai e le tele o talosaga tuto'atasi ona feso'ota'i ma isi. Aemaise lava, e galue o se tagata faufautua e mafai ai ona matou auina atu ma maua feʻau i le va o talosaga. O le mea muamua e tatau ona tatou faia o le fatuina o se autu. Na'o le alu ile Pub/Sub ile fa'amafanafanaga ma kiliki FUA LE AUTU.
O le fa'ailoga o lo'o i lalo e vala'au ai la tatou fa'amaumauga e fa'atupu ai fa'amaumauga o fa'amaumauga o lo'o fa'amatalaina i luga ona fa'afeso'ota'i lea ma lafo atu ogalaau ile Pub/Sub. Pau lava le mea e tatau ona tatou faia o le fatuina o se mea PublisherClient, faʻamaonia le ala i le autu e faʻaaoga ai le metotia topic_path
ma valaau le galuega publish
с topic_path
ma fa'amaumauga. Faamolemole ia matau mai matou te faaulufale mai generate_log_line
mai la matou tusitusiga stream_logs
, ia mautinoa o nei faila o loʻo i totonu o le pusa lava e tasi, a leai o le ae maua se mea sese mai fafo. Ona mafai lea ona matou faʻatautaia lenei mea e ala i la matou google console e faʻaaoga ai:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
O le taimi lava e alu ai le faila, o le a mafai ona tatou vaʻaia le gaioiga o faʻamaumauga o log i le faʻamafanafanaga, e pei ona faʻaalia i le ata o loʻo i lalo. O le a aoga lenei tusitusiga pe a tatou le fa'aaogaina FMT + Ce faauma ai.
Ata 4. Galuega Fa'atino publish_logs.py
Tusia la matou code paipa
O lea la ua uma ona saunia mea uma, e mafai ona tatou amataina le vaega malie - faʻavasega la tatou paipa e faʻaaoga ai le Beam ma le Python. Ina ia faia se paipa Beam, tatou te manaʻomia le fatuina o se mea paipa (p). O le taimi lava na matou fatuina ai se mea paipa, e mafai ona matou faʻaogaina le tele o galuega faʻatasi ma le isi e faʻaaoga ai le tagata faʻaoga pipe (|)
. I se tulaga lautele, o le galuega e pei o le ata o loʻo i lalo.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
I la tatou code, o le a tatou faia ni galuega masani se lua. Galuega regex_clean
, lea e su'esu'e fa'amaumauga ma toe aumai le laina tutusa e fa'atatau i le lisi PATTERNS e fa'aaoga ai le galuega. re.search
. O le galuega e toe fa'afo'i mai ai se manoa e vaelua koma. Afai e le o oe o se tagata poto masani faʻamatalaga, ou te fautuaina e siaki lenei mea datetime
i totonu o se galuega ina ia galue. Sa ou mauaina se mea sese mai fafo i le amataga o le faila, e ese. O le lisi lea e pasi atu i le galuega TusiToBigQuery, lea e na'o le fa'aopoopoina oa matou fa'amatalaga i le laulau. O le code mo Batch DataFlow Job ma Streaming DataFlow Job o loʻo tuʻuina atu i lalo. Pau lava le eseesega i le va o le batch ma le streaming code o le vaega matou te faitau ai le CSV mai src_path
fa'aaogaina le galuega ReadFromText
mai Beam.
Tulaga Fa'amatalagaFlow Galuega (vaega fa'aputuga)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Fa'asalalauina Fa'amatalagaFlow Galuega
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Amataina le conveyor
E mafai ona tatou faʻatautaia le paipa i ni auala eseese. Afai matou te mananaʻo, e mafai ona matou taʻavale faʻapitonuʻu mai se faʻailoga aʻo matou saini i le GCP mamao.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Ae ui i lea, o le a matou faʻaaogaina e faʻaaoga ai le DataFlow. E mafai ona tatou faia lenei mea e faʻaaoga ai le faʻatonuga o loʻo i lalo e ala i le setiina o taʻiala manaʻomia.
project
— ID o lau poloketi GCP.runner
ose paipa tamo'e o le a au'ili'ili lau polokalame ma fausia lau paipa. Ina ia tamoe i le ao, e tatau ona e faʻamaonia se DataflowRunner.staging_location
- o le ala i le Cloud Dataflow cloud storage mo faʻamaumauga o faʻamaumauga o loʻo manaʻomia e le au faʻatautaia o loʻo faʻatinoina le galuega.temp_location
- auala i le Cloud Dataflow cloud storage mo le teuina o faila galuega le tumau na faia aʻo faʻagasolo le paipa.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
A'o fa'agasolo lenei fa'atonuga, e mafai ona matou o atu i le Fa'amaumauga o le DataFlow i le google console ma va'ai i la matou paipa. A tatou kiliki i luga o le paipa, e tatau ona tatou vaʻai i se mea e tutusa ma le Ata 4. Mo faʻamoemoega faʻapipiʻi, e mafai ona fesoasoani tele le alu i Logs ona sosoo ai lea ma le Stackdriver e matamata i faʻamatalaga auiliili. Ua fesoasoani lenei mea ia te au e foia ai faafitauli o le paipa i le tele o mataupu.
Ata 4: Fa'alava fa'alava
Avanoa a matou fa'amatalaga ile BigQuery
O lea la, e tatau ona i ai se paipa o loʻo taʻavale ma faʻamaumauga o loʻo tafe i totonu o la tatou laulau. Ina ia faʻataʻitaʻiina lenei mea, e mafai ona tatou alu i BigQuery ma vaʻai i faʻamaumauga. A maeʻa ona faʻaogaina le faʻatonuga o loʻo i lalo e tatau ona e vaʻai i nai laina muamua o le faʻamaumauga. O lea la ua i ai fa'amaumauga o lo'o teuina i BigQuery, e mafai ona tatou fa'atautaia atili au'ili'iliga, fa'apea fo'i le fa'asoa atu o fa'amaumauga i a tatou pa'aga ma amata tali fesili fa'apisinisi.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Ata 5: BigQuery
iʻuga
Matou te faʻamoemoe o lenei pou e avea o se faʻataʻitaʻiga aoga o le fatuina o se paipa faʻamaumauga faʻasalalau, faʻapea foʻi ma le sailia o auala e faʻafaigofie ai faʻamatalaga. O le teuina o faʻamaumauga i lenei faatulagaga e maua ai le tele o mea lelei. O lea la e mafai ona tatou amata taliina fesili taua e pei o le toʻafia o tagata e faʻaaogaina a tatou oloa? O fa'atupula'ia lau fa'aoga fa'aoga ile taimi? O a vaega o le oloa e sili ona fegalegaleai ai tagata? Ma e i ai ni mea sese e le tatau ona i ai? O fesili ia o le a fiafia i ai le faalapotopotoga. Faʻavae i luga o faʻamatalaga e faʻaalia mai tali i nei fesili, e mafai ona tatou faʻaleleia le oloa ma faʻateleina le faʻaogaina o tagata.
E aoga tele le Beam mo lenei ituaiga o faʻamalositino ma e iai foʻi le tele o isi faʻaoga mataʻina. Mo se faʻataʻitaʻiga, atonu e te manaʻo e suʻesuʻe faʻamaumauga o faʻamaumauga i le taimi moni ma fai fefaʻatauaiga e faʻavae i luga o le auʻiliʻiliga, masalo o loʻo i ai sau faʻamatalaga faʻamatalaga e sau mai taʻavale ma e te manaʻo e faʻatatau le faʻatusatusaga o tulaga tau taavale. E mafai foi, mo se faʻataʻitaʻiga, avea oe ma kamupani taʻaloga e aoina faʻamatalaga tagata faʻaoga ma faʻaaogaina e fai ai lisi laupapa e siaki ai metotia autu. Lelei, alii, o se autu lenei mo se isi pou, faafetai mo le faitau, ma mo i latou e fia vaʻai i le code atoa, o loʻo i lalo le sootaga i laʻu GitHub.
Pau lava lena.
puna: www.habr.com