αα½ααααΈβα’αααβααΆααα’ααααααΆα ααΎααααα»αα
αααααααααΆααααααααααααααα
α»ααααααααα’ααααα αααααααΌαααΆααααα
αααΆαα·ααααααααΆαααα·ααααααααααα·ααααΆα
Apache Beam αα·α DataFlow αααααΆααααααααααα αΌαααααααααααααΆαα·α
ααΆαααα‘αΎα Google Cloud
α αααΆαα αααα»αααΆαααααΎ Google Cloud Shell ααΎααααΈααααΎαααΆαααααααααα αΌαααααα αα·ααααααααΆααα·αααααααααααα ααα»ααααΆαααααα½α αααααΆααααα»αααΆααααα αΆαααα»αααΆαααααΎαααΆαααααααααα αΌαααααααα αααα»α Python 3α Google Cloud Shell ααααΎ Python 2 αααααααααααΆαααΆαα½α Apache Beam α
ααΎααααΈα
αΆααααααΎαααααααααα αΌααααααααΎαααααΌαααΈαααααα·α
αα
αααα»αααΆααααααα αααααΆααβα’αααβαααβαα·αβααααΆααβααααΎ GCP ααΈαα»α α’αααβααΉαβααααΌαβααααΎβααΆαβααα αΆα 6 ααΆααααααβαααβααΆαβαααααΆααβαα
αααα»αβαααα
αααααΆααααΈααα ααΎαααΉαααααΌααααα ααααααααΈαααααααΎααα
Google Cloud Storage α αΎαα
ααααααΆαα
Google Cloud Shel ααααααΎαα ααΆαβαααα»αβα‘αΎαβαα
βααααααβαααα»αβαααβααΊβααΆβααΏαβααΌα
ααΆα
βααΆαα (α’αΆα
βααβααΎαβααΆαβαα·αααααΆ
ααΌαααΆαααΈ 2 α
ααΆαααβαααααΆβαααβααΎαβααααΌαβααΆαβα ααααβα―αααΆα αα·αβααα‘αΎαβαααααΆαααβαααβααααΌαβααΆαβααααΌαβααΆαβααΆαβαααααΈβααΆαβαααααα
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
ααΆααααααΎαααΌαααααΆααα·αααααα αα·αααΆααΆαααααααΎαα
αα
ααααααααΎαααΆααααα
ααααΆααααα αΆααααααΆααααααΉαααΆαααα‘αΎαααα ααΏααααααΆαααααααΎαααααΌαααααΎααΊαααααΎααααα»ααα·αααααα αα·αααΆααΆααα
αααα»α BigQuery α ααΆααα·ααΈααΆα
αααΎαααΎααααΈααααΎααΆ ααα»ααααααΆαααααααα»αααΊααααΎαα»αααΌα Google Cloud ααααααααΎααααα»ααα·ααααααααΆαα»ααα·αα α’αααα’αΆα
α’αα»ααααααΆαααα αΆαααΆαααααα
ααΌαααΆαααΈ 3. αααααααΆααΆα
ααΆαααααα»ααααα·αααααααααααα ααα»α’αααααααΎααααΆαα
Pub/Sub ααΊααΆααΆαα»ααααααααααΆααααααααααααα αΌααααααααααααΎα αααααααΆα’αα»ααααΆαα±αααααααα·ααΈα―αααΆαααααΆα αααΎαααΆααααααααΆαα αα·ααα ααα ααΆαα·ααα ααΆααααΎααΆαααΆα’ααααααΆααΈαααα’αα»ααααΆαα±ααααΎαααααΎ αα·αααα½αααΆααααΆααααααα·ααΈα ααΏαααααΌααααααΎαααααΌαααααΎααΊαααααΎααααααΆαααα ααααΆααααα αΌααα ααΆαα Pub/Sub αα αααα»ααα»αααΌα α αΎαα α»α αααααΎααααααΆαααα
ααΌαααΆααααααα α
ααααααΈαααααααΎαααΎααααΈαααααΎααα·αααααααααααα ααα»αααααΆααααααααΆαααΎ α αΎααααααΆααααααααΆαα αα·αααααΎαααααα ααα»αα
Pub/Sub α ααΏααααα½αααααααααΎαααααΌαααααΎααΊαααααΎαααααα»αα½αα α’αααααααα»αααααααΆααααααΆααααααΌααα
ααΆαααααααΆααααααααααΎαα·ααΈααΆααααα topic_path
α αΎαα α
αα»αααΆα publish
Ρ topic_path
αα·ααα·ααααααα ααΌαα
αααΆαααΆααΎαααΆαα
αΌα generate_log_line
ααΈααααααΈαααααααΎαα stream_logs
ααΌα
ααααααααΌαααααΆααααΆα―αααΆαααΆααααααααα·ααα
αααα»ααααααα½α ααΎαα·αααΌα
αααααα α’αααααΉαααα½αααΆαααα α»ααααα»αααΆαααΆαα
αΌαα αααααΆααααααΎαα’αΆα
ααααΎαααΆαααΆααΆαααααα»αααΌα Google ααααααΎααααααααΎα
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
αααΆαααΆα―αααΆαααααΎαααΆα ααΎαααΉαα’αΆα ααΎααααααααααα·αααααααααααα ααα»αα ααΆαααα»αααΌα ααΌα αααα αΆααααα»αααΌαααΆααααααα ααααααΈααααααΉαααααΎαααΆααααΆαααΆααΎααα·αααααΎ CTRL + CααΎααααΈαααααααΆα
ααΌαααΆαααΈ 4. αα·ααααα publish_logs.py
ααΆααααααααΌααααααααααααΎαα
α₯α‘αΌαααα ααΎαααΆααααα
αα’αααΈαααααααααΆαα αΎα ααΎαα’αΆα
α
αΆααααααΎααααααααααΈαααΆα αααααΊααΆααααααααΌααααααααααααΎααααααααΎ Beam αα·α Python α ααΎααααΈαααααΎα Beam pipeline ααΎαααααΌααααααΎα pipeline object (p)α αα
ααααααααΎαααΆααααααΎαααααα»ααααααα½α ααΎαα’αΆα
α’αα»αααααα»αααΆαα
αααΎααα½ααααααΆααααΈαα½ααααααααααααααααΎααααα·ααααα·αα pipe (|)
. ααΆααΌαα
ααα αΌαααΆαααΆαααΎααα
ααΌα
ααΌαααΆαααΆααααααα
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
αα
αααα»αααΌαααααααΎα ααΎαααΉααααααΎααα»αααΆαααααΆαααααα½αααΈαα αα»αααΆα regex_clean
αααααααααα·αααααα αα·αααΆααααα½αααααααααααΌαααααΆαααααα’ααααΎαααααΈ PATTERNS αααααααΎαα»αααΆα re.search
. α’αα»ααααβαααα‘ααβααααα’ααααβαααααααααααβαααααΆαααααα ααααα·αααΎα’ααααα·ααααααΆα’αααααααΆαααΆαααΆααααα
ααααα·ααααααΆαα αααα»αααΌαααααΆαα±αααα·αα·αααααΎαααΏαααα datetime
αα
ααΆααααα»ααα»αααΆαααΎααααΈααααΎα±ααααΆααααΎαααΆαα αααα»αβααα½αβααΆαβααα α»αβααΆαβααΆαβα
αΌαβαα
βααΎαβα―αααΆαβαααβα
αααααα αααααΆαααααααααΈαααααααΌαααΆααααααΌααα
αα»αααΆα WriteToBigQueryαααααααΆαααααααααααα·ααααααααααααΎααα
αααα»αααΆααΆαα ααΌααααααΆαα Batch DataFlow Job αα·α Streaming DataFlow Job ααααΌαααΆααααααα±ααααΆααααααα ααΆααα»αααααΆαααα½αααααααΆα batch αα·α streaming code ααΊααΆαα
αααα»α batch ααΎαα’αΆα CSV ααΈ src_path
αααααααΎαα»αααΆα ReadFromText
ααΈ Beam α
Batch DataFlow Job (ααααΎαααΆαααΆααΆα α)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
ααΆαααΆαααααααΈα DataFlow (ααααΎαααΆαααααααΈα)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
ααΆαα αΆααααααΎαα§ααααααααααΌα
ααΎαα’αΆα ααααΎαααΆααααααααΆααα·ααΈααααααααααΆααΆα αααΎαα ααααα·αααΎααΎαα ααααΆα ααΎαα’αΆα ααααΎαααΆαααΆαα αααα»αααΌαααααΆαααΈααααΆααΈααα½α αααααααααα αΌααα αααα»α GCP ααΈα ααααΆαα
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
ααααααΆαααΆααααα ααΎαααΉαααααΎαααΆαααΆαααααααΎ DataFlow α ααΎαα’αΆα ααααΎααΆαααααααΎααΆααααααααΆααΆαααααααααααααααααΆαααΆαααααααααααααΌαααΆαααΌα ααΆααααααα
project
- ααααααααΆαααααααα GCP ααααα’ααααrunner
ααΊααΆα’ααααααααααααααααΉααα·ααΆααααααα·ααΈααααα’ααα αα·αααΆααααααααααααα αΌααααααααααα’αααα ααΎααααΈααααΎαααΆααααα»αααα α’αααααααΌααααααααΆαα DataflowRunner αstaging_location
- ααααΌααα ααΆαα Cloud Dataflow cloud storage αααααΆαααααααΎααα·αα·αααααααα ααααΌααααααααΌαααΆαααα processors αααααααΎαααΆαααΆαααΆααtemp_location
β ααααΌααα ααΆαα Cloud Dataflow cloud storage αααααΆαααααααΆαα»αα―αααΆαααΆαααΆαααααααα’αΆαααααααααΆααααααΎααααααααααααααααααα»αααααΎαααΆααstreaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
αααααααααααΆααααααααΆααααααα»αααααΎαααΆα ααΎαα’αΆα α αΌααα ααΆααααααΆαα DataFlow αα αααα»ααα»αααΌα Google α αΎαααΎααααααααααααΎαα αα ααααααααΎαα α»α ααΎααααααααα αΌαααααα ααΎααα½αααααΎαα’αααΈαααααααααααΉαααΌαααΆαααΈ 4α αααααΆαααααααααααααΆααααα α»α ααΆα’αΆα ααΆαααααααααααααΆααααΆαααααα»αααΆαα αΌααα ααΆαα Logs α αΎααααααΆαααααα ααΆαα Stackdriver ααΎααααΈααΎααααααα ααα»αααα’α·αα αααααΆααα½ααααα»ααααααααΆααααα αΆααααααααα αΌαααααααα αααα»αααααΈαα½αα ααα½αα
ααΌαααΆαααΈ 4: α§ααααααααααΌαααααΉα
α αΌαααααΎαα·ααααααααααααΎααα αααα»α BigQuery
ααΌα αααα ααΎααα½αααααΆαααααααααα αΌαααααααααααααΎαααΆαααΆαα½ααα·αααααααααα αΌαα αΌααα αααα»αααΆααΆαααααααΎαα ααΎααααΈααΆαααααααα ααΎαα’αΆα α αΌααα ααΆαα BigQuery α αΎαααΎααα·ααααααα αααααΆααααΈααααΎααΆααααααααΆααΆαααααα α’ααααα½αααααΎααα½αααΈαααΈααααΌααααααα»ααα·ααααααα α₯α‘αΌααααααΎαααΆααα·αααααααααααΆααααααΆαα»ααα αααα»α BigQuery ααΎαα’αΆα ααααΎααΆααα·ααΆααααααα ααααΌα ααΆα ααααααααα·ααααααααΆαα½ααα αααα·α α αΎαα αΆααααααΎαααααΎααααα½αα’αΆααΈαααααα
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
ααΌαααΆαααΈ 5α BigQuery
ααα ααααΈααααα·ααααΆα
ααΎααααααΉαααΆααΆααααα αααααααΎααα½ααΆα§ααΆα αααααααΆαααααααααααααΆααααααΎαααααααααααΌααα·αααααα ααααΌα ααΆααΆαααααααααα·ααΈααααΎα±αααα·ααααααα’αΆα α αΌαααααΎααααΆααααΆαααΆααααα αααΎαα ααΆααααααΆαα»ααα·αααααααααα»αααααααααααααααα±ααααΎαααΌαα’αααααααααααααΆα αααΎαα α₯α‘αΌααααααΎαα’αΆα α αΆααααααΎαααααΎααααα½αααααΆαααααΌα ααΆααΎααΆαααα»αααααα»ααααΆαααΆαααααααααΎααα·αααααααααΎα? ααΎααΌαααααΆαα’αααααααΎααααΆααααααα’αααααΎαα‘αΎαααΆααααααααΆαααα¬αα? ααΎαα·αααααΆαα’αααΈααααααααα·ααααααααα»αααααΆααααααααΆα αααΎααααα»α? α αΎαααΆαααα α»ααααααααΆααααα·ααα½αααΆα? ααΆαααααααΊααΆαααα½ααααααΉαα αΆααα’αΆααααααα ααααα’αααααΆαα αααααα’ααααΎααΆααααααΉαααααα»αα ααααΈα ααααΎαα αααααααα½αααΆααααα ααΎαα’αΆα αααααα’ααα·ααα αα·ααααααΎαααΆαα αΌααα½αααααα’αααααααΎααααΆααα
Beam αα·αααΆααΆααααααααααααααΆααααα αΆααααααααααα α αΎαααΆαααααΈααααΎααααΆαααα½αα±ααα αΆααα’αΆαααααααα½αα ααα½αααααααααα α§ααΆα ααα α’ααααααα ααααΆα αααα·ααΆααα·αααααααααααΆααΈααααα»ααααααααΆααΆααααααα αα·αααααΎααΆαα·ααααααααααααα’ααααΎααΆααα·ααΆα αααα ααααΆα’αααααΆααα·ααααααα§αααααα αΆαααααααΆααααΈααΆααααα α αΎαα ααααααΆααΆαααααΆααααα·αα ααΆα αααα α§ααΆα ααα α’αααααα’αΆα ααΆαααα»αα αα»αα ααααααααααααΌααα·ααααααα’αααααααΎααααΆαα α αΎαααααΎααΆααΎααααΈαααααΎαααααΆαααααααααααααΎααααΈααΆαααΆαααααααααααΆαααα α’αΌαα αα»ααΆαααΆααΈ αααααΆαααααΆααααααααΆαααααααΆααα½αααα α’ααα»ααααααΆααααΆαα’αΆα α αΎααααααΆααα’αααα ααααΎαααΌαααα ααΆαααααααααααΆαααααααΆαααα ααΆαα GitHub αααααααα»αα
α’ααα αΎαα
ααααα: www.habr.com