Kia ora katoa. Kei te tohatohahia e matou te whakamaoritanga o te waahanga whakamutunga o te tuhinga, kua whakaritea mo nga akonga o te akoranga.
Apache Beam me te Raraunga Raraunga mo nga Pipeline Wa-Tuturu
Te whakatu Google Cloud
Tuhipoka: I whakamahia e ahau a Google Cloud Shell ki te whakahaere i te pipeline me te whakaputa i nga raraunga rangitaki ritenga na te mea i raru ahau ki te whakahaere i te pipeline i Python 3. Ka whakamahi a Google Cloud Shell i te Python 2, he pai ake ki a Apache Beam.
Hei timata i te paipa, me keri iti ki nga tautuhinga. Mo koe kaore ano kia whakamahi i te GCP i mua, me whai koe i nga hikoinga e 6 e whai ake nei i tuhia ki tenei
Whai muri i tenei, me tuku ake a maatau tuhinga ki te Rokiroki Kapua a Google ka kape ki to maatau Google Cloud Shel. He iti noa te tuku ki te rokiroki kapua (ka kitea he whakaahuatanga
2 Whakaatu
Ko nga whakahau hei kape i nga konae me te whakauru i nga whare pukapuka e hiahiatia ana kei raro nei.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Te hanga i to maatau paparangi raraunga me te ripanga
Kia oti i a matou nga mahi katoa e pa ana ki te tatūnga, ko te mahi ka whai ake ko te hanga i tetahi huingararaunga me te ripanga ki BigQuery. He maha nga huarahi hei mahi i tenei, engari ko te mea ngawari ko te whakamahi i te papatohu Google Cloud ma te hanga tuatahi i tetahi huingararaunga. Ka taea e koe te whai i nga kaupae i raro nei
Whakaatu 3. Tahora ripanga
Te whakaputa raraunga rangitaki kaiwhakamahi
Ko te Pub/Sub tetahi waahanga nui o to maatau paipa na te mea ka taea e nga tono motuhake maha te korero ki a raatau ano. Ina koa, he mahi takawaenga e taea ai e tatou te tuku me te whiwhi karere i waenga i nga tono. Ko te mahi tuatahi ko te hanga kaupapa. Haere noa ki te Pub/Sub i roto i te papatohu ka paato i te WAIHANGA TOPIC.
Ko te waehere i raro nei ka karanga i to maatau tuhinga ki te whakaputa i nga raraunga rangitaki kua tautuhia i runga ake nei, ka hono, ka tukuna nga raarangi ki te Pub/Sub. Ko te mea anake hei mahi ma tatou ko te hanga taonga KaiwhakaputaKaihoko, tohua te ara ki te kaupapa ma te whakamahi i te tikanga topic_path
ka karanga i te mahi publish
с topic_path
me nga raraunga. Kia mahara mai kei te kawemai matou generate_log_line
mai i ta maatau tuhinga stream_logs
, na kia mohio kei roto enei konae i te kōpaki kotahi, ki te kore ka puta he hapa kawemai. Ka taea e taatau te whakahaere i tenei ma to maatau papatohu google ma te whakamahi:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
I te wa e rere ana te konae, ka taea e tatou te kite i te putanga o nga raraunga rangitaki ki te papatohu, penei i te ahua i raro nei. Ka mahi tenei tuhinga ki te kore tatou e whakamahi CTRL + Cki te whakaoti.
Whakaahua 4. Putanga publish_logs.py
Te tuhi i a maatau waehere paipa
Inaianei kua rite nga mea katoa, ka taea e taatau te tiimata i te waahanga ngahau - te tohu i ta maatau paipa ma te whakamahi i te Beam me te Python. Hei hanga paipa kurupae, me hanga he ahanoa paipa (p). Ina oti te hanga i tetahi ahanoa paipa, ka taea e taatau te whakamahi i nga mahi maha tetahi i muri i tetahi ma te whakamahi i te kaiwhakahaere pipe (|)
. I te nuinga o te waa, he rite te ahua o te rerengamahi ki te ahua o raro nei.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
I roto i to maatau waehere, ka hangaia e matou nga mahi ritenga e rua. Mahi regex_clean
, ka matawai i nga raraunga me te tiki i te rarangi e rite ana i runga i te rarangi PATTERNS ma te whakamahi i te mahi re.search
. Ka whakahokia e te taumahi he aho wehea piko. Mena ehara koe i te tohunga ki te whakaputa korero, ka tūtohu ahau kia tirohia tenei datetime
i roto i tetahi mahi kia mahi ai. He hapa kawemai ahau i te timatanga o te konae, he ahua ke. Ka tukuna tenei rarangi ki te mahi TuhiaToBigQuery, ka taapiri noa i o maatau raraunga ki te ripanga. Kei raro iho nei te waehere mo te Putunga DataFlow Job me te Streaming DataFlow Job. Ko te rereketanga anake i waenga i te puranga me te waehere rerenga ko te roopu ka panuihia e matou te CSV mai src_path
te whakamahi i te mahi ReadFromText
na Beam.
Te Mahi Raraunga Raraunga (tukatuka puranga)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Rere Raraunga Mahi (tukatuka awa)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Ka timata te kaikawe
Ka taea e tatou te whakahaere i te paipa i roto i nga huarahi rereke. Ki te hiahia matou, ka taea noa e matou te whakahaere i te rohe mai i te tauranga i te wa e takiuru mamao ana ki te GCP.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Heoi, ka whakahaerehia e maatau ma te whakamahi i te DataFlow. Ka taea e tatou te mahi ma te whakamahi i te whakahau i raro nei ma te whakarite i nga tawhā e whai ake nei.
project
— ID o to kaupapa GCP.runner
he kaiwhangai paipa hei tātari i to kaupapa me te hanga i to paipa. Hei rere i roto i te kapua, me tohu e koe he DataflowRunner.staging_location
— te ara ki te rokiroki kapua Cloud Dataflow mo nga kohinga tohu tohu e hiahiatia ana e nga kaitukatuka e mahi ana i te mahi.temp_location
— ara ki te kapua rokiroki kapua Cloud Dataflow mo te penapena i nga konae mahi rangitahi i hangaia i te wa e rere ana te paipa.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
I te wa e rere ana tenei whakahau, ka taea e tatou te haere ki te ripa DataFlow i te papatohu google me te tiro i to maatau paipa. Ka paato tatou i te paipa, me kite tatou i tetahi mea e rite ana ki te Whakaahua 4. Mo nga kaupapa patuiro, ka tino awhina ki te haere ki te Rangitaki katahi ki te Stackdriver ki te tiro i nga raarangi taipitopito. Na tenei i awhina i ahau ki te whakatau i nga take paipa i roto i te maha o nga keehi.
Whakaahua 4: Kaikawe kurupae
Uru ki a maatau raraunga i BigQuery
Na, me whai paipa kee taatau me nga raraunga e rere ana ki roto i ta maatau ripanga. Hei whakamatautau i tenei, ka taea e tatou te haere ki BigQuery me te titiro ki nga raraunga. Whai muri i te whakamahi i te whakahau i raro nei me kite koe i nga rarangi tuatahi o te huingararaunga. Inaianei kei a matou nga raraunga kua penapena ki BigQuery, ka taea e matou te whakahaere i etahi atu tātaritanga, me te tiri i nga raraunga ki o hoa mahi ka tiimata ki te whakautu i nga paatai pakihi.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Whakaahua 5: BigQuery
mutunga
Ko te tumanako ka waiho tenei pou hei tauira whaihua mo te hanga i te paipa raraunga roma, me te rapu huarahi kia pai ake te uru atu o nga raraunga. Ko te penapena raraunga ki tenei whakatakotoranga he maha nga painga. Inaianei ka tiimata taatau ki te whakautu i nga patai nui penei i te tini o nga tangata e whakamahi ana i a maatau hua? Kei te tipu haere to turanga kaiwhakamahi i roto i te waa? He aha nga ahuatanga o te hua ka tino mahi tahi te tangata? A he hapa kei te kore e tika? Ko enei nga patai ka pai ki te whakahaere. I runga i nga whakaaro ka puta mai i nga whakautu ki enei patai, ka taea e tatou te whakapai ake i te hua me te whakanui ake i te whakauru o nga kaiwhakamahi.
He tino whai hua a Beam mo tenei momo whakakori tinana, he maha atu ano nga keehi whakamahi pai. Hei tauira, ka hiahia koe ki te wetewete i nga raraunga tohu kararehe i roto i te waa tuuturu me te mahi hokohoko i runga i te tātaritanga, tera pea kei a koe nga raraunga puoro mai i nga waka me te hiahia ki te tatau i nga tatauranga taumata waka. Ka taea hoki e koe, hei tauira, he kamupene petipeti e kohi raraunga kaiwhakamahi me te whakamahi hei hanga papatohu ki te whai i nga inenga matua. Kaati, e koro, he kaupapa tenei mo tetahi atu panui, whakawhetai mo te panui, mo te hunga e hiahia ana ki te kite i te katoa o te waehere, kei raro nei te hono ki taku GitHub.
Ko te katoa.
Source: will.com