Aloha kākou. Ke kaʻana nei mākou i ka unuhi ʻana o ka hapa hope o ka ʻatikala, i hoʻomākaukau pono ʻia no nā haumāna o ka papa.
ʻO Apache Beam a me DataFlow no nā Pipeline manawa maoli
Hoʻonohonoho ʻana iā Google Cloud
Nānā: Ua hoʻohana au iā Google Cloud Shell e holo i ka pipeline a hoʻopuka i ka ʻikepili log maʻamau no ka mea ua pilikia wau i ka holo ʻana i ka pipeline ma Python 3. Hoʻohana ʻo Google Cloud Shell i ka Python 2, ʻoi aku ka kūlike me Apache Beam.
No ka hoʻomakaʻana i ka pipeline, pono mākou eʻeli iki i nā hoʻonohonoho. No ʻoukou i hoʻohana ʻole i ka GCP ma mua, pono ʻoe e hahai i kēia mau ʻanuʻu 6 i wehewehe ʻia ma kēia
Ma hope o kēia, pono mākou e hoʻouka i kā mākou mau palapala i Google Cloud Storage a kope iā lākou i kā mākou Google Cloud Shel. He mea liʻiliʻi loa ka hoʻouka ʻana i ka waihona kapuaʻi (hiki ke loaʻa kahi wehewehe
2 Kānāwai
ʻO nā kauoha e pono ai mākou e kope i nā faila a hoʻokomo i nā hale waihona puke i makemake ʻia ma lalo nei.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Ke hana nei i kā mākou waihona a me ka papa
Ke hoʻopau mākou i nā ʻanuʻu pili i ka hoʻonohonoho ʻana, ʻo ka mea aʻe e pono ai mākou e hana, ʻo ia ka hana ʻana i kahi waihona a me ka papa ma BigQuery. Nui nā ala e hana ai i kēia, akā ʻo ka maʻalahi ka hoʻohana ʻana i ka Google Cloud console ma ka hana mua ʻana i kahi waihona. Hiki iā ʻoe ke hahai i nā ʻanuʻu ma lalo
Kiʻi 3. Hoʻolālā papa
Ke hoʻopuka nei i ka ʻikepili mooolelo mea hoʻohana
ʻO Pub/Sub kahi mea koʻikoʻi o kā mākou pipeline no ka mea hiki i nā noi kūʻokoʻa lehulehu ke kamaʻilio me kekahi. ʻO ka mea kūikawā, hana ia ma ke ʻano he mea waena e hiki ai iā mākou ke hoʻouna a loaʻa i nā leka ma waena o nā noi. ʻO ka mea mua e pono ai mākou e hana i kahi kumuhana. E hele wale i Pub/Sub i ka console a kaomi i CREATE TOPIC.
Kāhea ka code ma lalo i kā mākou palapala e hana i ka ʻikepili log i wehewehe ʻia ma luna a laila hoʻohui a hoʻouna i nā lāʻau i Pub/Sub. ʻO ka mea wale nō e pono ai mākou e hana i kahi mea Mea hoʻopuka Client, e kuhikuhi i ke ala i ke kumuhana me ke ala topic_path
a kāhea i ka hana publish
с topic_path
a me ka ʻikepili. E ʻoluʻolu, e lawe mai mākou generate_log_line
mai kā mākou palapala stream_logs
, no laila e hōʻoia i kēia mau faila i loko o ka waihona hoʻokahi, i ʻole e loaʻa iā ʻoe kahi hewa lawe mai. Hiki iā mākou ke holo i kēia ma kā mākou google console me ka hoʻohana ʻana:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
I ka holo ʻana o ka faila, hiki iā mākou ke ʻike i ka hoʻopuka ʻana o ka ʻikepili log i ka console, e like me ka mea i hōʻike ʻia ma ke kiʻi ma lalo nei. E holo ana kēia palapala inā ʻaʻole mākou e hoʻohana CTRL + Ce hoopau.
Kiʻi 4. Hoʻopuka publish_logs.py
Ke kākau ʻana i kā mākou code pipeline
I kēia manawa ua hoʻomākaukau mākou i nā mea āpau, hiki iā mākou ke hoʻomaka i ka ʻāpana leʻaleʻa - ka hoʻopili ʻana i kā mākou pipeline me ka Beam a me Python. No ka hana ʻana i kahi pipeline Beam, pono mākou e hana i kahi mea pipeline (p). Ke hana mākou i kahi mea pipeline, hiki iā mākou ke hoʻohana i nā hana he nui ma hope o kekahi me ka hoʻohana ʻana i ka mea hoʻohana pipe (|)
. Ma keʻano laulā, ua like ke ʻano o ke kaʻina hana me ke kiʻi ma lalo nei.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Ma kā mākou code, e hana mākou i ʻelua mau hana maʻamau. Hana regex_clean
, ka mea nānā i ka ʻikepili a hoʻihoʻi i ka lālani pili e pili ana i ka papa inoa PATTERNS me ka hoʻohana ʻana i ka hana. re.search
. Hoʻihoʻi ka hana i kahi kaula i hoʻokaʻawale ʻia i ke koma. Inā ʻaʻole ʻoe he loea ʻōlelo maʻamau, paipai wau e nānā i kēia datetime
i loko o kahi hana e hana ai. Loaʻa iaʻu kahi hewa lawe mai i ka hoʻomaka ʻana o ka faila, he mea ʻē. Hāʻawi ʻia kēia papa inoa i ka hana KākauToBigQuery, ka mea e hoʻohui wale i kā mākou ʻikepili i ka papaʻaina. Hāʻawi ʻia ka code no Batch DataFlow Job a me Streaming DataFlow Job ma lalo nei. ʻO ka ʻokoʻa wale nō ma waena o ka pūʻulu a me ke code streaming ʻo ia ma ka batch heluhelu mākou i ka CSV mai src_path
hoʻohana i ka hana ReadFromText
mai ka Beam.
Hui Pūʻulu DataFlow (ka hana pūʻulu)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Ke hoʻoheheʻe ʻana i ka DataFlow Job (ka hoʻoili kahawai)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Hoʻomaka i ka conveyor
Hiki iā mākou ke holo i ka pipeline ma nā ʻano like ʻole. Inā makemake mākou, hiki iā mākou ke holo ma ka ʻāina mai kahi pahu i ka wā e hoʻopaʻa ana i ka GCP mamao.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Eia naʻe, e holo mākou me ka hoʻohana ʻana i DataFlow. Hiki iā mākou ke hana i kēia me ka hoʻohana ʻana i ke kauoha ma lalo nei ma ka hoʻonohonoho ʻana i nā ʻāpana e pono ai.
project
— ID o kāu papahana GCP.runner
he mea holo paipu e kālailai i kāu polokalamu a kūkulu i kāu paipu. No ka holo ʻana i ke ao, pono ʻoe e kuhikuhi i kahi DataflowRunner.staging_location
- ke ala i ka Cloud Dataflow cloud storage no ka helu ʻana i nā pūʻolo helu helu e pono ai e nā mea hana e hana ana i ka hana.temp_location
— ala i ka waihona kapuaʻi Cloud Dataflow no ka mālama ʻana i nā faila hana pōkole i hana ʻia i ka wā e holo ana ka pipeline.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
ʻOiai e holo ana kēia kauoha, hiki iā mākou ke hele i ka DataFlow tab i ka google console a nānā i kā mākou pipeline. Ke kaomi mākou i ka pipeline, pono mākou e ʻike i kahi mea e like me ka Figure 4. No ka hoʻopau ʻana, hiki ke kōkua nui i ka hele ʻana i Logs a laila i Stackdriver e nānā i nā kikoʻī kikoʻī. Ua kōkua kēia iaʻu e hoʻoponopono i nā pilikia pipeline i kekahi mau hihia.
Kiʻi 4: ʻO ka mea lawe kukui
E kiʻi i kā mākou ʻikepili ma BigQuery
No laila, pono mākou i kahi pipeline e holo ana me ka ʻikepili e kahe ana i kā mākou papaʻaina. No ka hoʻāʻo ʻana i kēia, hiki iā mākou ke hele i BigQuery a nānā i ka ʻikepili. Ma hope o ka hoʻohana ʻana i ke kauoha ma lalo nei e ʻike ʻoe i nā lālani mua o ka dataset. I kēia manawa ua loaʻa iā mākou ka ʻikepili i mālama ʻia ma BigQuery, hiki iā mākou ke hana hou i ka nānā ʻana, a kaʻana like i ka ʻikepili me nā hoa hana a hoʻomaka e pane i nā nīnau ʻoihana.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Kiʻi 5: BigQuery
hopena
Manaʻo mākou e lilo kēia pou i kumu hoʻohālike maikaʻi no ka hoʻokumu ʻana i kahi pipeline data streaming, a me ka ʻimi ʻana i nā ala e hiki ai ke ʻike i ka ʻikepili. ʻO ka mālama ʻana i ka ʻikepili i kēia ʻano hāʻawi iā mākou i nā pono he nui. I kēia manawa hiki iā mākou ke hoʻomaka e pane i nā nīnau koʻikoʻi e like me ka nui o ka poʻe e hoʻohana nei i kā mākou huahana? Ke ulu nei kāu waihona mea hoʻohana i ka manawa? He aha nā hiʻohiʻona o ka huahana e launa nui ai nā kānaka? A he mau hewa paha kahi e pono ʻole ai? ʻO kēia nā nīnau e pili ana i ka hui. Ma muli o nā ʻike e puka mai ana mai nā pane i kēia mau nīnau, hiki iā mākou ke hoʻomaikaʻi i ka huahana a hoʻonui i ka hoʻopili ʻana o ka mea hoʻohana.
Pono maoli ʻo Beam no kēia ʻano hoʻomaʻamaʻa a loaʻa kekahi mau hihia hoʻohana hoihoi. No ka laʻana, makemake paha ʻoe e kālailai i ka ʻikepili tick stock i ka manawa maoli a hana i nā kālepa e pili ana i ka nānā ʻana, malia paha loaʻa iā ʻoe ka ʻikepili sensor e hele mai ana mai nā kaʻa a makemake ʻoe e helu i ka helu ʻana o ka pae kaʻa. Hiki iā ʻoe, no ka laʻana, he hui pāʻani e hōʻiliʻili i ka ʻikepili mea hoʻohana a hoʻohana iā ia e hana i nā dashboards e hahai i nā metric koʻikoʻi. ʻAe, e nā keʻena, he kumuhana kēia no kekahi pou, mahalo no ka heluhelu ʻana, a no ka poʻe makemake e ʻike i ke code piha, aia ma lalo ka loulou i kaʻu GitHub.
Pau kēlā.
Source: www.habr.com