Nyob zoo sawv daws. Peb tab tom sib koom kev txhais lus ntawm qhov kawg ntawm tsab xov xwm, npaj tshwj xeeb rau cov tub ntxhais kawm ntawm chav kawm.
Apache Beam thiab DataFlow rau Real-Time Pipelines
Teeb tsa Google Cloud
Nco tseg: Kuv tau siv Google Cloud Plhaub los khiav cov raj xa dej thiab tshaj tawm cov ntaub ntawv teev tseg kev cai vim tias kuv muaj teeb meem khiav cov raj xa dej hauv Python 3. Google Cloud Plhaub siv Python 2, uas zoo sib xws nrog Apache Beam.
Txhawm rau pib lub raj xa dej, peb yuav tsum khawb me ntsis rau hauv qhov chaw. Rau cov neeg ntawm koj uas tsis tau siv GCP ua ntej, koj yuav tsum ua raws li 6 cov kauj ruam hauv qab no
Tom qab no, peb yuav tsum tau muab peb cov ntawv sau rau hauv Google Cloud Storage thiab luam rau peb Google Cloud Shel. Uploading rau huab cia yog qhov tsis tseem ceeb (cov lus piav qhia tuaj yeem pom
2 teeb duab
Cov lus txib peb yuav tsum luam cov ntaub ntawv thiab teeb tsa cov tsev qiv ntawv yuav tsum tau teev tseg hauv qab no.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Tsim peb database thiab rooj
Thaum peb tau ua tiav tag nrho cov txheej txheem teeb tsa, qhov txuas ntxiv peb yuav tsum ua yog tsim cov ntaub ntawv thiab cov lus hauv BigQuery. Muaj ntau txoj hauv kev los ua qhov no, tab sis qhov yooj yim tshaj plaws yog siv Google Cloud console los ntawm thawj zaug tsim cov ntaub ntawv. Koj tuaj yeem ua raws li cov kauj ruam hauv qab no
Daim duab 3. Rooj layout
Tshaj tawm cov ntaub ntawv teev cov neeg siv
Pub / Sub yog ib qho tseem ceeb ntawm peb cov raj xa dej vim nws tso cai rau ntau daim ntawv thov kev ywj pheej sib txuas lus. Tshwj xeeb, nws ua haujlwm ua tus neeg nruab nrab uas tso cai rau peb xa thiab tau txais cov lus ntawm cov ntawv thov. Thawj qhov peb yuav tsum tau ua yog tsim lub ntsiab lus. Tsuas yog mus rau Pub / Sub hauv lub console thiab nyem CREATE TOPIC.
Cov cai hauv qab no hu peb tsab ntawv los tsim cov ntaub ntawv teev tseg saum toj no thiab tom qab ntawd txuas thiab xa cov cav mus rau Pub / Sub. Tib yam peb yuav tsum tau ua yog tsim ib yam khoom PublisherClient, qhia txoj kev mus rau lub ncauj lus uas siv txoj kev topic_path
thiab hu rau lub luag haujlwm publish
Ρ topic_path
thiab cov ntaub ntawv. Thov nco ntsoov tias peb import generate_log_line
los ntawm peb tsab ntawv stream_logs
, yog li xyuas kom tseeb tias cov ntaub ntawv no nyob hauv tib lub nplaub tshev, txwv tsis pub koj yuav tau txais qhov yuam kev ntshuam. Peb tuaj yeem khiav qhov no los ntawm peb lub google console siv:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Thaum cov ntaub ntawv khiav, peb yuav tuaj yeem pom qhov tso zis ntawm cov ntaub ntawv log rau lub console, raws li qhia hauv daim duab hauv qab no. Tsab ntawv no yuav ua haujlwm ntev npaum li peb tsis siv CTRL + Cua kom tiav.
Daim duab 4. Tso zis publish_logs.py
Sau peb lub raj xa dej code
Tam sim no peb muaj txhua yam npaj, peb tuaj yeem pib qhov kev lom zem - coding peb cov raj xa dej siv Beam thiab Python. Yuav kom tsim tau ib tug Beam pipeline, peb yuav tsum tsim ib tug pipeline khoom (p). Thaum peb tau tsim cov khoom siv kav dej, peb tuaj yeem siv ntau txoj haujlwm ib zaug los ntawm kev siv tus neeg teb xov tooj pipe (|)
. Feem ntau, kev ua haujlwm zoo li cov duab hauv qab no.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Hauv peb cov cai, peb yuav tsim ob txoj haujlwm kev cai. Muaj nuj nqi regex_clean
, uas scans cov ntaub ntawv thiab retrieves cov kab sib raug raws li cov npe PATTERNS siv cov haujlwm re.search
. Cov haujlwm xa rov qab ib txoj hlua sib cais comma. Yog tias koj tsis yog ib tus kws tshaj lij hais lus tsis tu ncua, kuv xav kom kuaj xyuas qhov no datetime
hauv ib qho kev ua haujlwm kom nws ua haujlwm. Kuv tau txais ib qho yuam kev ntshuam thaum pib ntawm cov ntaub ntawv, uas yog weird. Cov npe no ces dhau mus rau qhov ua haujlwm WriteToBigQuery, uas tsuas ntxiv peb cov ntaub ntawv rau lub rooj. Cov cai rau Batch DataFlow Txoj Haujlwm thiab Streaming DataFlow Txoj Haujlwm tau muab hauv qab no. Tsuas yog qhov sib txawv ntawm batch thiab streaming code yog nyob rau hauv batch peb nyeem CSV los ntawm src_path
siv lub luag haujlwm ReadFromText
los ntawm Beam.
Batch DataFlow Txoj Haujlwm (batch processing)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Txoj Haujlwm (kwj ua haujlwm)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Pib lub conveyor
Peb tuaj yeem khiav lub raj xa dej hauv ntau txoj kev sib txawv. Yog tias peb xav tau, peb tuaj yeem khiav nws hauv zos los ntawm lub davhlau ya nyob twg thaum nkag mus rau GCP remotely.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Txawm li cas los xij, peb yuav khiav nws siv DataFlow. Peb tuaj yeem ua qhov no siv cov lus txib hauv qab no los ntawm kev teeb tsa cov kev xav tau hauv qab no.
project
- ID ntawm koj qhov project GCP.runner
yog tus kav kav dej uas yuav txheeb xyuas koj qhov kev pab cuam thiab tsim koj cov kav dej. Txhawm rau khiav hauv huab, koj yuav tsum qhia kom meej DataflowRunner.staging_location
- txoj kev mus rau Cloud Dataflow huab cia rau indexing code pob xav tau los ntawm cov txheej txheem ua haujlwm.temp_location
- txoj hauv kev mus rau Cloud Dataflow huab cia khaws cia cov ntaub ntawv ua haujlwm ib ntus tsim thaum lub raj xa dej ua haujlwm.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Thaum cov lus txib no tab tom khiav, peb tuaj yeem mus rau DataFlow tab hauv google console thiab saib peb cov raj xa dej. Thaum peb nyem rau ntawm lub raj xa dej, peb yuav tsum pom qee yam zoo ib yam li daim duab 4. Rau kev debugging lub hom phiaj, nws tuaj yeem pab tau zoo heev kom mus rau Logs thiab tom qab ntawd mus rau Stackdriver los saib cov cav kom ntxaws. Qhov no tau pab kuv daws cov teeb meem ntawm cov kav dej hauv ntau qhov xwm txheej.
Daim duab 4: Beam conveyor
Nkag mus rau peb cov ntaub ntawv hauv BigQuery
Yog li, peb yuav tsum twb muaj lub raj xa dej nrog cov ntaub ntawv ntws mus rau hauv peb lub rooj. Txhawm rau kuaj qhov no, peb tuaj yeem mus rau BigQuery thiab saib cov ntaub ntawv. Tom qab siv cov lus txib hauv qab no koj yuav tsum pom thawj ob peb kab ntawm dataset. Tam sim no peb muaj cov ntaub ntawv khaws cia hauv BigQuery, peb tuaj yeem ua qhov kev soj ntsuam ntxiv, nrog rau muab cov ntaub ntawv nrog cov npoj yaig thiab pib teb cov lus nug ua lag luam.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Daim duab 5: BigQuery
xaus
Peb cia siab tias cov ntawv tshaj tawm no yog ib qho piv txwv tseem ceeb ntawm kev tsim cov streaming cov ntaub ntawv xa mus, nrog rau nrhiav txoj hauv kev los ua kom cov ntaub ntawv nkag tau yooj yim dua. Kev khaws cov ntaub ntawv hauv hom ntawv no muab ntau yam zoo rau peb. Tam sim no peb tuaj yeem pib teb cov lus nug tseem ceeb xws li pes tsawg tus neeg siv peb cov khoom? Puas yog koj tus neeg siv lub hauv paus loj hlob dhau sijhawm? Dab tsi ntawm cov khoom lag luam ua rau tib neeg cuam tshuam nrog ntau tshaj? Thiab puas muaj qhov yuam kev uas yuav tsum tsis txhob muaj? Cov no yog cov lus nug uas yuav txaus siab rau lub koom haum. Raws li kev nkag siab uas tshwm sim los ntawm cov lus teb rau cov lus nug no, peb tuaj yeem txhim kho cov khoom lag luam thiab ua kom cov neeg siv kev koom tes.
Beam yeej muaj txiaj ntsig zoo rau hom kev tawm dag zog no thiab muaj ntau qhov kev siv nthuav dav thiab. Piv txwv li, tej zaum koj yuav xav txheeb xyuas cov zuam cov ntaub ntawv hauv lub sijhawm tiag tiag thiab ua lag luam raws li kev tshuaj xyuas, tej zaum koj muaj cov ntaub ntawv sensor los ntawm tsheb thiab xav xam cov lej tsheb laij teb. Koj tuaj yeem, piv txwv li, yog lub tuam txhab kev ua si uas sau cov neeg siv cov ntaub ntawv thiab siv nws los tsim dashboards txhawm rau taug qab cov ntsuas tseem ceeb. Okay, cov txiv neej, qhov no yog lub ntsiab lus rau lwm tus tshaj tawm, ua tsaug rau kev nyeem ntawv, thiab rau cov neeg uas xav pom tag nrho cov cai, hauv qab no yog qhov txuas rau kuv GitHub.
Yog tag nrho.
Tau qhov twg los: www.hab.com