Lumelang bohle. Re arolelana phetolelo ea karolo ea ho qetela ea sehlooho, e lokiselitsoeng ka ho khetheha bakeng sa liithuti tsa thupelo.
Apache Beam le DataFlow bakeng sa Liphaephe tsa Nako ea 'Nete
Ho theha Google Cloud
Tlhokomeliso: Ke sebelisitse Google Cloud Shell ho tsamaisa lipeipi le ho phatlalatsa lintlha tse tloahelehileng hobane ke ne ke thatafalloa ke ho tsamaisa lipeipi tsa Python 3. Google Cloud Shell e sebelisa Python 2, e lumellanang haholoanyane le Apache Beam.
Ho qala liphaephe, re hloka ho cheka hanyenyane ho li-setting. Bakeng sa lona ba so kang ba sebelisa GCP pele, le tla hloka ho latela mehato e 6 e latelang e boletsoeng ho sena
Ka mor'a sena, re tla hloka ho kenya litokomane tsa rona ho Google Cloud Storage le ho li kopitsa ho Google Cloud Shel ea rona. Ho kenya polokelong ea maru ke ntho e nyane haholo (tlhaloso e ka fumanoa
Setšoantšo sa 2
Litaelo tseo re li hlokang ho kopitsa lifaele le ho kenya lilaebrari tse hlokahalang li thathamisitsoe ka tlase.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Ho theha database ea rona le tafole
Hang ha re qetile mehato eohle e amanang le ho seta, ntho e latelang eo re lokelang ho e etsa ke ho theha dataset le tafole ho BigQuery. Ho na le mekhoa e mengata ea ho etsa sena, empa e bonolo ka ho fetisisa ke ho sebelisa Google Cloud console ka ho qala dataset. U ka latela mehato e ka tlase
Setšoantšo sa 3. Sebopeho sa tafole
Ho phatlalatsa lintlha tsa mosebelisi
Pub/Sub ke karolo ea bohlokoa ea lipeipi tsa rona hobane e lumella lisebelisoa tse ngata tse ikemetseng ho buisana. Haholo-holo, e sebetsa e le mokena-lipakeng o re lumellang ho romela le ho amohela melaetsa pakeng tsa lits'ebetso. Ntho ea pele eo re lokelang ho e etsa ke ho theha sehlooho. Eya feela ho Pub/Sub ho khomphutha ebe o tobetsa CREATE TOPIC.
Khoutu e ka tlase e bitsa sengoloa sa rona ho hlahisa data ea log e hlalositsoeng ka holimo ebe e hokela le ho romela lintlha ho Pub/Sub. Ntho feela eo re lokelang ho e etsa ke ho theha ntho MohatisiClient, hlakisa tsela ea sehlooho u sebelisa mokhoa topic_path
ebe o bitsa tshebetso publish
с topic_path
le data. Ka kopo hlokomela hore re reka kantle ho naha generate_log_line
ho tsoa ho script ea rona stream_logs
, kahoo etsa bonnete ba hore lifaele tsena li foldareng e le 'ngoe, ho seng joalo u tla fumana phoso ea ho kenya. Joale re ka tsamaisa sena ka google console ea rona re sebelisa:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Hang ha faele e matha, re tla khona ho bona tlhahiso ea data ea log ho console, joalokaha ho bontšitsoe setšoantšong se ka tlase. Script ena e tla sebetsa ha feela re sa e sebelise CTRL + Cho e phetha.
Setšoantšo sa 4. Sephetho publish_logs.py
Ho ngola khoutu ea rona ea pipeline
Kaha joale re se re lokisitse tsohle, re ka qala karolo e monate - ho kenya lipeipi tsa rona re sebelisa Beam le Python. Ho theha pipeline ea Beam, re hloka ho theha ntho ea liphaephe (p). Ha re se re thehile ntho ea liphaephe, re ka sebelisa mesebetsi e mengata ka mor'a e 'ngoe re sebelisa opareitara pipe (|)
. Ka kakaretso, mokhoa oa ho sebetsa o shebahala joaloka setšoantšo se ka tlase.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Khoutung ea rona, re tla theha mesebetsi e 'meli ea tloaelo. Mosebetsi regex_clean
, e hlahlobang data le ho fumana mola o lumellanang le lenane la PATTERNS ho sebelisa ts'ebetso re.search
. Mosebetsi o khutlisa khoele e arohaneng ea phegelwana. Haeba u se setsebi sa kamehla sa polelo, ke khothaletsa ho hlahloba sena datetime
ka hare ho mosebetsi ho etsa hore e sebetse. Ke ne ke fumana phoso ea ho reka thepa qalong ea faele, e leng ntho e makatsang. Lenane lena le fetisetsoa tšebetsong WriteToBigQuery, e leng se eketsang data ea rona tafoleng. Khoutu ea Batch DataFlow Job le Streaming DataFlow Job e fanoe ka tlase. Phapang e le 'ngoe feela pakeng tsa batch le khoutu ea ho phallela ke hore ka batch re bala CSV ho tloha src_path
ka ho sebedisa tshebetso ReadFromText
ho tloha Beam.
Mosebetsi oa Batch DataFlow (batch processing)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Mosebetsi oa Phallo ea DataFlow (ho sebetsa ka molapo)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Ho qala conveyor
Re ka tsamaisa lipeipi ka mekhoa e mengata e fapaneng. Haeba re ne re batla, re ka e tsamaisa sebakeng sa heno ho tsoa ho terminal ha re ntse re kena ho GCP re le hole.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Leha ho le joalo, re tla e tsamaisa re sebelisa DataFlow. Re ka etsa sena re sebelisa taelo e ka tlase ka ho beha li-parameter tse hlokahalang tse latelang.
project
- ID ea projeke ea hau ea GCP.runner
ke semathi sa lipeipi se tla hlahlobisisa lenaneo la hau le ho etsa lipeipi tsa hau. Ho sebetsa lerung, o tlameha ho hlakisa DataflowRunner.staging_location
- tsela e eang polokelong ea leru ea Cloud Dataflow bakeng sa liphutheloana tsa khoutu tse hlokoang ke li-processor tse etsang mosebetsi.temp_location
- tsela e eang polokelong ea leru ea Cloud Dataflow bakeng sa ho boloka lifaele tsa nakoana tsa mosebetsi tse entsoeng ha phala e ntse e sebetsa.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Ha taelo ena e ntse e sebetsa, re ka ea ho "DataFlow tab" ho google console 'me ra sheba lipeipi tsa rona. Ha re tobetsa pipeline, re lokela ho bona ntho e tšoanang le Setšoantšo sa 4. Bakeng sa merero ea ho lokisa liphoso, ho ka ba molemo haholo ho ea ho Li-Logs le ho Stackdriver ho sheba lintlha tse qaqileng. Sena se nthusitse ho rarolla mathata a lipeipi maemong a 'maloa.
Setšoantšo sa 4: Beam conveyor
Fumana lintlha tsa rona ho BigQuery
Kahoo, re lokela ho se ntse re na le pipeline e tsamaeang le data e phallang tafoleng ea rona. Ho leka sena, re ka ea ho BigQuery 'me ra sheba lintlha. Kamora ho sebelisa taelo e ka tlase o lokela ho bona mela e 'maloa ea pele ea dataset. Kaha joale re na le lintlha tse bolokiloeng ho BigQuery, re ka etsa tlhahlobo e eketsehileng, hammoho le ho arolelana lintlha le basebetsi-'moho le rona le ho qala ho araba lipotso tsa khoebo.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Setšoantšo sa 5: BigQuery
fihlela qeto e
Re tšepa hore poso ena e sebetsa e le mohlala o molemo oa ho theha pipeline ea data e phallelang, hammoho le ho fumana litsela tsa ho etsa hore data e fumanehe haholoanyane. Ho boloka data ka mokhoa ona ho re fa melemo e mengata. Joale re ka qala ho araba lipotso tsa bohlokoa tse kang hore na ke batho ba bakae ba sebelisang sehlahisoa sa rona? Na basebelisi ba hau ba ntse ba eketseha ha nako e ntse e feta? Ke likarolo life tsa sehlahisoa tseo batho ba kopanang le tsona haholo? Hona na ho na le liphoso moo ho sa lokelang ho ba teng? Tsena ke lipotso tse tla khahla mokhatlo. Ho ipapisitsoe le lintlha tse hlahang likarabong tsa lipotso tsena, re ka ntlafatsa sehlahisoa le ho eketsa tšebelisano ea basebelisi.
Beam e bohlokoa haholo bakeng sa boikoetliso ba mofuta ona mme e na le linyeoe tse ling tse ngata tse khahlisang tsa ts'ebeliso. Mohlala, o kanna oa batla ho sekaseka data ea stock tick ka nako ea nnete mme o etse khoebo ho latela tlhahlobo, mohlomong o na le data ea sensor e tsoang likoloing mme o batla ho bala lipalo tsa boemo ba sephethephethe. Hape, ka mohlala, u ka ba k'hamphani ea lipapali e bokellang lintlha tsa basebelisi le ho e sebelisa ho etsa li-dashboards ho latela metrics ea bohlokoa. Ho lokile, beng ba ka, sena ke sehlooho sa poso e 'ngoe, ke leboha ho bala, le bakeng sa ba batlang ho bona khoutu e feletseng, ka tlase ke sehokelo sa GitHub ea ka.
Ke phetho.
Source: www.habr.com