ããã«ã¡ã¯ãã¿ããªã ã³ãŒã¹ã®åŠçåãã«ç¹å¥ã«çšæããããèšäºã®æåŸã®éšåã®ç¿»èš³ãå
±æããŸãã
ãªã¢ã«ã¿ã€ã ãã€ãã©ã€ã³ã®ããã® Apache Beam ãš DataFlow
Google Cloud ã®ã»ããã¢ãã
泚: Python 3 ã§ãã€ãã©ã€ã³ãå®è¡ããéã«åé¡ããã£ããããGoogle Cloud Shell ã䜿çšããŠãã€ãã©ã€ã³ãå®è¡ããã«ã¹ã¿ã ãã° ããŒã¿ãå ¬éããŸãããGoogle Cloud Shell ã¯ãApache Beam ãšããäžè²«æ§ã®ãã Python 2 ã䜿çšããŸãã
ãã€ãã©ã€ã³ãéå§ããã«ã¯ãèšå®ãå°ãæãäžããå¿
èŠããããŸãã ãããŸã§ GCP ã䜿çšããããšããªãæ¹ã¯ããã®èšäºã§æŠèª¬ãããŠãã次㮠6 ã€ã®æé ã«åŸãå¿
èŠããããŸãã
ãã®åŸãã¹ã¯ãªããã Google Cloud Storage ã«ã¢ããããŒãããGoogle Cloud Shel ã«ã³ããŒããå¿
èŠããããŸãã ã¯ã©ãŠã ã¹ãã¬ãŒãžãžã®ã¢ããããŒãã¯éåžžã«ç°¡åã§ã (説æã¯ãã¡ããã芧ãã ãã)
å³2
ãã¡ã€ã«ãã³ããŒããå¿ èŠãªã©ã€ãã©ãªãã€ã³ã¹ããŒã«ããããã«å¿ èŠãªã³ãã³ãã以äžã«ç€ºããŸãã
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
ããŒã¿ããŒã¹ãšããŒãã«ã®äœæ
ã»ããã¢ããé¢é£ã®æé ããã¹ãŠå®äºãããã次ã«è¡ãå¿
èŠãããã®ã¯ãBigQuery ã§ããŒã¿ã»ãããšããŒãã«ãäœæããããšã§ãã ãããè¡ãã«ã¯ããã€ãã®æ¹æ³ããããŸãããæãç°¡åãªã®ã¯ãæåã«ããŒã¿ã»ãããäœæã㊠Google Cloud ã³ã³ãœãŒã«ã䜿çšããããšã§ãã 以äžã®æé ã«åŸã£ãŠãã ãã
å³ 3. ããŒãã«ã®ã¬ã€ã¢ãŠã
ãŠãŒã¶ãŒãã°ããŒã¿ã®å ¬é
Pub/Sub ã¯ãè€æ°ã®ç¬ç«ããã¢ããªã±ãŒã·ã§ã³ãçžäºã«éä¿¡ã§ããããã«ããããããã€ãã©ã€ã³ã®éèŠãªã³ã³ããŒãã³ãã§ãã ç¹ã«ãã¢ããªã±ãŒã·ã§ã³éã§ã¡ãã»ãŒãžãéåä¿¡ã§ããããã«ãã仲ä»è ãšããŠæ©èœããŸãã ãŸãæåã«ãããã¯ãäœæããå¿ èŠããããŸãã ã³ã³ãœãŒã«ã§ Pub/Sub ã«ç§»åããããããã¯ã®äœæããã¯ãªãã¯ããã ãã§ãã
以äžã®ã³ãŒãã¯ãã¹ã¯ãªãããåŒã³åºããŠäžã§å®çŸ©ãããã° ããŒã¿ãçæãããã°ã Pub/Sub ã«æ¥ç¶ããŠéä¿¡ããŸãã å¿
èŠãªã®ã¯ãªããžã§ã¯ããäœæããããšã ãã§ã ãããªãã·ã£ãŒã¯ã©ã€ã¢ã³ããã¡ãœããã䜿çšããŠãããã¯ãžã®ãã¹ãæå®ããŸã topic_path
ãããŠé¢æ°ãåŒã³åºããŸã publish
Ñ topic_path
ãããŠããŒã¿ã 茞å
¥ããŠãããŸãã®ã§ãäºæ¿ãã ãã generate_log_line
ç§ãã¡ã®ã¹ã¯ãªãããã stream_logs
ãããã£ãŠããããã®ãã¡ã€ã«ãåããã©ã«ããŒã«ããããšã確èªããŠãã ãããããããªããšãã€ã³ããŒã ãšã©ãŒãçºçããŸãã 次ã«ã以äžã䜿çšã㊠Google ã³ã³ãœãŒã«ãããããå®è¡ã§ããŸãã
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
ãã¡ã€ã«ãå®è¡ããããšããã«ã次ã®å³ã«ç€ºãããã«ããã° ããŒã¿ãã³ã³ãœãŒã«ã«åºåãããã®ã確èªã§ããããã«ãªããŸãã ãã®ã¹ã¯ãªããã¯ã䜿çšããªãéãæ©èœããŸãã CTRL + Cãããå®äºããããã«ã
å³ 4. åºå publish_logs.py
ãã€ãã©ã€ã³ã³ãŒããæžã
ãã¹ãŠã®æºåãæŽã£ãã®ã§ã楜ããéšåãã€ãŸã Beam ãš Python ã䜿çšãããã€ãã©ã€ã³ã®ã³ãŒãã£ã³ã°ãéå§ã§ããŸãã Beam ãã€ãã©ã€ã³ãäœæããã«ã¯ããã€ãã©ã€ã³ ãªããžã§ã¯ã (p) ãäœæããå¿
èŠããããŸãã ãã€ãã©ã€ã³ ãªããžã§ã¯ããäœæããããæŒç®åã䜿çšããŠè€æ°ã®é¢æ°ã次ã
ã«é©çšã§ããŸãã pipe (|)
ã äžè¬ã«ãã¯ãŒã¯ãããŒã¯æ¬¡ã®å³ã®ããã«ãªããŸãã
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
ã³ãŒãã§ã¯ XNUMX ã€ã®ã«ã¹ã¿ã é¢æ°ãäœæããŸãã é¢æ° regex_clean
ãããŒã¿ãã¹ãã£ã³ããé¢æ°ã䜿çšã㊠PATTERNS ãªã¹ãã«åºã¥ããŠå¯Ÿå¿ããè¡ãååŸããŸãã re.search
ã ãã®é¢æ°ã¯ã«ã³ãåºåãã®æååãè¿ããŸãã æ£èŠè¡šçŸã®å°é家ã§ãªãå Žåã¯ãããã確èªããããšããå§ãããŸã datetime
ãããæ©èœãããããã«é¢æ°å
ã§ã ãã¡ã€ã«ã®å
é ã§ã€ã³ããŒã ãšã©ãŒãçºçããŠããŸããããããã¯å¥åŠã§ããã ãã®ãªã¹ãã¯é¢æ°ã«æž¡ãããŸãã BigQuery ã«æžã蟌ãããã¯åã«ããŒã¿ãããŒãã«ã«è¿œå ããã ãã§ãã ããã DataFlow ãžã§ããšã¹ããªãŒãã³ã° DataFlow ãžã§ãã®ã³ãŒãã以äžã«ç€ºããŸãã ããã ã³ãŒããšã¹ããªãŒãã³ã° ã³ãŒãã®å¯äžã®éãã¯ããããã§ã¯ CSV ãèªã¿åãããšã§ãã src_path
é¢æ°ã䜿çšãã ReadFromText
ããŒã ããã
ããã DataFlow ãžã§ã (ãããåŠç)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
ã¹ããªãŒãã³ã° DataFlow ãžã§ã (ã¹ããªãŒã åŠç)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
ã³ã³ãã¢ã®èµ·å
ãã€ãã©ã€ã³ã¯ããã€ãã®ç°ãªãæ¹æ³ã§å®è¡ã§ããŸãã å¿ èŠã«å¿ããŠãGCP ã«ãªã¢ãŒãã§ãã°ã€ã³ããªãããã¿ãŒããã«ããããŒã«ã«ã§å®è¡ããããšãã§ããŸãã
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
ãã ããDataFlow ã䜿çšããŠå®è¡ããŸãã ãããè¡ãã«ã¯ã次ã®ã³ãã³ãã䜿çšãã次ã®å¿ é ãã©ã¡ãŒã¿ãèšå®ããŸãã
project
â GCP ãããžã§ã¯ãã® IDãrunner
ã¯ãããã°ã©ã ãåæããŠãã€ãã©ã€ã³ãæ§ç¯ãããã€ãã©ã€ã³ ã©ã³ããŒã§ãã ã¯ã©ãŠãã§å®è¡ããã«ã¯ãDataflowRunner ãæå®ããå¿ èŠããããŸããstaging_location
â äœæ¥ãå®è¡ããããã»ããµãå¿ èŠãšããã³ãŒã ããã±ãŒãžã®ã€ã³ããã¯ã¹ãäœæããããã® Cloud Dataflow ã¯ã©ãŠã ã¹ãã¬ãŒãžãžã®ãã¹ãtemp_location
â ãã€ãã©ã€ã³ã®å®è¡äžã«äœæãããäžæãžã§ã ãã¡ã€ã«ãä¿åããããã® Cloud Dataflow ã¯ã©ãŠã ã¹ãã¬ãŒãžãžã®ãã¹ãstreaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
ãã®ã³ãã³ãã®å®è¡äžã«ãGoogle ã³ã³ãœãŒã«ã® [DataFlow] ã¿ãã«ç§»åããŠãã€ãã©ã€ã³ã衚瀺ã§ããŸãã ãã€ãã©ã€ã³ãã¯ãªãã¯ãããšãå³ 4 ã®ãããªãã®ã衚瀺ãããã¯ãã§ãããããã°ã®å Žåã¯ã[ãã°] ã«ç§»åããŠãã Stackdriver ã«ç§»åããŠè©³çŽ°ãªãã°ã衚瀺ãããšéåžžã«åœ¹ç«ã¡ãŸãã ããã¯ãå€ãã®ã±ãŒã¹ã§ãã€ãã©ã€ã³ã®åé¡ã解決ããã®ã«åœ¹ç«ã¡ãŸããã
å³ 4: ããŒã ã³ã³ãã¢
BigQuery ã§ããŒã¿ã«ã¢ã¯ã»ã¹ãã
ãããã£ãŠãããŒãã«ã«ããŒã¿ãæµå ¥ãããã€ãã©ã€ã³ããã§ã«å®è¡ãããŠããã¯ãã§ãã ããããã¹ãããã«ã¯ãBigQuery ã«ã¢ã¯ã»ã¹ããŠããŒã¿ã確èªããŸãã 以äžã®ã³ãã³ãã䜿çšãããšãããŒã¿ã»ããã®æåã®æ°è¡ã衚瀺ãããã¯ãã§ãã ããŒã¿ã BigQuery ã«ä¿åããã®ã§ãããã«åæãè¡ã£ãããååãšããŒã¿ãå ±æããŠããžãã¹äžã®è³ªåã«çãããã§ããããã«ãªããŸããã
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
å³ 5: BigQuery
ãŸãšã
ãã®æçš¿ããã¹ããªãŒãã³ã° ããŒã¿ ãã€ãã©ã€ã³ã®äœæãšãããŒã¿ã«ã¢ã¯ã»ã¹ããããããæ¹æ³ãèŠã€ããããã®æçãªäŸãšããŠåœ¹ç«ã€ããšãé¡ã£ãŠããŸãã ãã®åœ¢åŒã§ããŒã¿ãä¿åãããšãå€ãã®å©ç¹ãåŸãããŸãã ããã§ãäœäººã®äººã補åã䜿çšããŠããããªã©ã®éèŠãªè³ªåã«çãå§ããããšãã§ããŸãã ãŠãŒã¶ãŒããŒã¹ã¯æéã®çµéãšãšãã«å¢å ããŠããŸãã? 人ã ã¯è£œåã®ã©ã®åŽé¢ã«æã觊ããŸãã? ãããŠãããã¯ãã®ãªããšããã«ãšã©ãŒã¯ãããŸãã? ãããã¯çµç¹ã«ãšã£ãŠèå³æ·±ã質åã§ãã ãããã®è³ªåãžã®åçããåŸãããæŽå¯ã«åºã¥ããŠã補åãæ¹åãããŠãŒã¶ãŒ ãšã³ã²ãŒãžã¡ã³ããé«ããããšãã§ããŸãã
Beam ã¯ãã®çš®ã®æŒç¿ã«éåžžã«åœ¹ç«ã¡ãä»ã«ãèå³æ·±ã䜿çšäŸãå€æ°ãããŸãã ããšãã°ãæ ªåŒãã£ã㯠ããŒã¿ããªã¢ã«ã¿ã€ã ã§åæãããã®åæã«åºã¥ããŠååŒãè¡ãããå Žåããè»äž¡ããã®ã»ã³ãµãŒ ããŒã¿ãããã亀éã¬ãã«ã®èšç®ãè¡ãããå Žåãªã©ããããŸãã ããšãã°ããŠãŒã¶ãŒ ããŒã¿ãåéããããã䜿çšããŠäž»èŠãªææšã远跡ããããã®ããã·ã¥ããŒããäœæããã²ãŒã äŒç€ŸãèããããŸãã ããŠãçãããããã¯å¥ã®æçš¿ã®ãããã¯ã§ããèªãã§ããã ãããããšãããããŸããå®å šãªã³ãŒããèŠãã人ã®ããã«ã以äžã«ç§ã® GitHub ãžã®ãªã³ã¯ã貌ã£ãŠãããŸãã
ããã ãã§ãã
åºæïŒ habr.com