అందరికి వందనాలు. మేము వ్యాసం యొక్క చివరి భాగం యొక్క అనువాదాన్ని భాగస్వామ్యం చేస్తున్నాము, ఇది కోర్సులోని విద్యార్థుల కోసం ప్రత్యేకంగా సిద్ధం చేయబడింది.
రియల్-టైమ్ పైప్లైన్ల కోసం అపాచీ బీమ్ మరియు డేటాఫ్లో
Google క్లౌడ్ని సెటప్ చేస్తోంది
గమనిక: పైథాన్ 3లో పైప్లైన్ను రన్ చేయడంలో నాకు సమస్య ఉన్నందున పైప్లైన్ను అమలు చేయడానికి మరియు అనుకూల లాగ్ డేటాను ప్రచురించడానికి నేను Google Cloud Shellని ఉపయోగించాను. Google Cloud Shell Apache Beamతో మరింత స్థిరంగా ఉండే Python 2ని ఉపయోగిస్తుంది.
పైప్లైన్ను ప్రారంభించడానికి, మేము సెట్టింగులలో కొద్దిగా త్రవ్వాలి. మీలో ఇంతకు ముందు GCP ఉపయోగించని వారి కోసం, మీరు ఇందులో వివరించిన క్రింది 6 దశలను అనుసరించాలి
దీని తర్వాత, మేము మా స్క్రిప్ట్లను Google క్లౌడ్ స్టోరేజ్కి అప్లోడ్ చేయాలి మరియు వాటిని మా Google క్లౌడ్ షెల్కి కాపీ చేయాలి. క్లౌడ్ నిల్వకు అప్లోడ్ చేయడం చాలా చిన్న విషయం (వివరణ కనుగొనవచ్చు
మూర్తి 2
ఫైల్లను కాపీ చేసి, అవసరమైన లైబ్రరీలను ఇన్స్టాల్ చేయడానికి మనకు అవసరమైన ఆదేశాలు క్రింద ఇవ్వబడ్డాయి.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
మా డేటాబేస్ మరియు పట్టికను సృష్టిస్తోంది
మేము అన్ని సెటప్ సంబంధిత దశలను పూర్తి చేసిన తర్వాత, మేము చేయవలసిన తదుపరి పని BigQueryలో డేటాసెట్ మరియు పట్టికను సృష్టించడం. దీన్ని చేయడానికి అనేక మార్గాలు ఉన్నాయి, అయితే ముందుగా డేటాసెట్ను సృష్టించడం ద్వారా Google క్లౌడ్ కన్సోల్ను ఉపయోగించడం చాలా సరళమైనది. మీరు క్రింది దశలను అనుసరించవచ్చు
మూర్తి 3. టేబుల్ లేఅవుట్
వినియోగదారు లాగ్ డేటాను ప్రచురించడం
పబ్/సబ్ అనేది మా పైప్లైన్లో కీలకమైన భాగం ఎందుకంటే ఇది బహుళ స్వతంత్ర అప్లికేషన్లను ఒకదానితో ఒకటి కమ్యూనికేట్ చేయడానికి అనుమతిస్తుంది. ప్రత్యేకంగా, ఇది అప్లికేషన్ల మధ్య సందేశాలను పంపడానికి మరియు స్వీకరించడానికి మమ్మల్ని అనుమతించే మధ్యవర్తిగా పనిచేస్తుంది. మేము చేయవలసిన మొదటి విషయం ఏమిటంటే ఒక అంశాన్ని సృష్టించడం. కన్సోల్లోని పబ్/సబ్కి వెళ్లి, టాపిక్ సృష్టించు క్లిక్ చేయండి.
దిగువన ఉన్న కోడ్ పైన నిర్వచించిన లాగ్ డేటాను రూపొందించడానికి మా స్క్రిప్ట్ని పిలుస్తుంది మరియు ఆపై లాగ్లను కనెక్ట్ చేసి పబ్/సబ్కి పంపుతుంది. మనం చేయవలసింది ఒక వస్తువును సృష్టించడమే పబ్లిషర్ క్లయింట్, పద్ధతిని ఉపయోగించి అంశానికి మార్గాన్ని పేర్కొనండి topic_path
మరియు ఫంక్షన్కు కాల్ చేయండి publish
с topic_path
మరియు డేటా. మేము దిగుమతి చేసుకుంటున్నామని దయచేసి గమనించండి generate_log_line
మా స్క్రిప్ట్ నుండి stream_logs
, కాబట్టి ఈ ఫైల్లు ఒకే ఫోల్డర్లో ఉన్నాయని నిర్ధారించుకోండి, లేకుంటే మీరు దిగుమతి ఎర్రర్ను పొందుతారు. మేము దీన్ని ఉపయోగించి మా గూగుల్ కన్సోల్ ద్వారా దీన్ని అమలు చేయవచ్చు:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
ఫైల్ రన్ అయిన వెంటనే, దిగువ చిత్రంలో చూపిన విధంగా, కన్సోల్కు లాగ్ డేటా యొక్క అవుట్పుట్ను చూడగలుగుతాము. మనం ఉపయోగించనంత కాలం ఈ స్క్రిప్ట్ పని చేస్తుంది CTRL + Cదాన్ని పూర్తి చేయడానికి.
మూర్తి 4. అవుట్పుట్ publish_logs.py
Написание кода нашего конвейера
ఇప్పుడు మేము ప్రతిదీ సిద్ధం చేసాము, మేము సరదా భాగాన్ని ప్రారంభించవచ్చు - బీమ్ మరియు పైథాన్ ఉపయోగించి మా పైప్లైన్ కోడింగ్. బీమ్ పైప్లైన్ను రూపొందించడానికి, మేము పైప్లైన్ వస్తువు (p)ని సృష్టించాలి. మేము పైప్లైన్ ఆబ్జెక్ట్ను సృష్టించిన తర్వాత, ఆపరేటర్ని ఉపయోగించి మనం బహుళ ఫంక్షన్లను ఒకదాని తర్వాత ఒకటి వర్తింపజేయవచ్చు pipe (|)
. సాధారణంగా, వర్క్ఫ్లో క్రింద ఉన్న చిత్రం వలె కనిపిస్తుంది.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
మా కోడ్లో, మేము రెండు అనుకూల ఫంక్షన్లను సృష్టిస్తాము. ఫంక్షన్ regex_clean
, ఇది డేటాను స్కాన్ చేస్తుంది మరియు ఫంక్షన్ని ఉపయోగించి PATTERNS జాబితా ఆధారంగా సంబంధిత అడ్డు వరుసను తిరిగి పొందుతుంది re.search
. ఫంక్షన్ కామాతో వేరు చేయబడిన స్ట్రింగ్ను అందిస్తుంది. మీరు సాధారణ వ్యక్తీకరణ నిపుణుడు కాకపోతే, దీన్ని తనిఖీ చేయాలని నేను సిఫార్సు చేస్తున్నాను datetime
అది పని చేయడానికి ఒక ఫంక్షన్ లోపల. నేను ఫైల్ ప్రారంభంలో దిగుమతి ఎర్రర్ను పొందుతున్నాను, ఇది విచిత్రంగా ఉంది. ఈ జాబితా తర్వాత ఫంక్షన్కు పంపబడుతుంది WriteToBigQuery, ఇది కేవలం మన డేటాను టేబుల్కి జోడిస్తుంది. బ్యాచ్ డేటాఫ్లో జాబ్ మరియు స్ట్రీమింగ్ డేటాఫ్లో జాబ్ కోసం కోడ్ క్రింద ఇవ్వబడింది. బ్యాచ్ మరియు స్ట్రీమింగ్ కోడ్ మధ్య ఉన్న ఏకైక తేడా ఏమిటంటే, బ్యాచ్లో మనం CSVని చదవడం src_path
ఫంక్షన్ ఉపయోగించి ReadFromText
బీమ్ నుండి.
బ్యాచ్ డేటాఫ్లో జాబ్ (బ్యాచ్ ప్రాసెసింగ్)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
స్ట్రీమింగ్ డేటాఫ్లో జాబ్ (స్ట్రీమ్ ప్రాసెసింగ్)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
కన్వేయర్ను ప్రారంభిస్తోంది
మేము పైప్లైన్ను వివిధ మార్గాల్లో అమలు చేయవచ్చు. మనకు కావాలంటే, GCPకి రిమోట్గా లాగిన్ చేస్తున్నప్పుడు మేము దానిని టెర్మినల్ నుండి స్థానికంగా అమలు చేయవచ్చు.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
అయితే, మేము డేటాఫ్లో ఉపయోగించి దీన్ని అమలు చేయబోతున్నాము. కింది అవసరమైన పారామితులను సెట్ చేయడం ద్వారా దిగువ ఆదేశాన్ని ఉపయోగించి మనం దీన్ని చేయవచ్చు.
project
— మీ GCP ప్రాజెక్ట్ యొక్క ID.runner
మీ ప్రోగ్రామ్ను విశ్లేషించి, మీ పైప్లైన్ని నిర్మించే పైప్లైన్ రన్నర్. క్లౌడ్లో అమలు చేయడానికి, మీరు తప్పనిసరిగా డేటాఫ్లోరన్నర్ని పేర్కొనాలి.staging_location
— పనిని నిర్వహిస్తున్న ప్రాసెసర్లకు అవసరమైన కోడ్ ప్యాకేజీలను ఇండెక్సింగ్ చేయడానికి క్లౌడ్ డేటాఫ్లో క్లౌడ్ స్టోరేజ్కి మార్గం.temp_location
— పైప్లైన్ నడుస్తున్నప్పుడు సృష్టించబడిన తాత్కాలిక జాబ్ ఫైల్లను నిల్వ చేయడానికి క్లౌడ్ డేటాఫ్లో క్లౌడ్ స్టోరేజ్కి మార్గం.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
ఈ కమాండ్ రన్ అవుతున్నప్పుడు, మనం గూగుల్ కన్సోల్లోని డేటాఫ్లో ట్యాబ్కి వెళ్లి మన పైప్లైన్ను చూడవచ్చు. మనం పైప్లైన్పై క్లిక్ చేసినప్పుడు, మనకు మూర్తి 4 వంటిది కనిపిస్తుంది. డీబగ్గింగ్ ప్రయోజనాల కోసం, వివరణాత్మక లాగ్లను వీక్షించడానికి లాగ్లకు వెళ్లి ఆపై స్టాక్డ్రైవర్కు వెళ్లడం చాలా ఉపయోగకరంగా ఉంటుంది. ఇది అనేక సందర్భాల్లో పైప్లైన్ సమస్యలను పరిష్కరించడంలో నాకు సహాయపడింది.
మూర్తి 4: బీమ్ కన్వేయర్
BigQueryలో మా డేటాను యాక్సెస్ చేయండి
కాబట్టి, మన టేబుల్లోకి డేటా ప్రవహించే పైప్లైన్ ఇప్పటికే ఉండాలి. దీన్ని పరీక్షించడానికి, మేము BigQueryకి వెళ్లి డేటాను చూడవచ్చు. దిగువ ఆదేశాన్ని ఉపయోగించిన తర్వాత మీరు డేటాసెట్ యొక్క మొదటి కొన్ని వరుసలను చూడాలి. ఇప్పుడు మేము BigQueryలో నిల్వ చేసిన డేటాను కలిగి ఉన్నాము, మేము తదుపరి విశ్లేషణను నిర్వహించగలము, అలాగే డేటాను సహోద్యోగులతో పంచుకోవచ్చు మరియు వ్యాపార ప్రశ్నలకు సమాధానం ఇవ్వడం ప్రారంభించవచ్చు.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
మూర్తి 5: BigQuery
తీర్మానం
ఈ పోస్ట్ స్ట్రీమింగ్ డేటా పైప్లైన్ను రూపొందించడానికి, అలాగే డేటాను మరింత ప్రాప్యత చేయడానికి మార్గాలను కనుగొనడంలో ఉపయోగకరమైన ఉదాహరణగా ఉపయోగపడుతుందని మేము ఆశిస్తున్నాము. ఈ ఫార్మాట్లో డేటాను నిల్వ చేయడం వల్ల మనకు చాలా ప్రయోజనాలు లభిస్తాయి. ఇప్పుడు మన ఉత్పత్తిని ఎంత మంది వ్యక్తులు ఉపయోగిస్తున్నారు వంటి ముఖ్యమైన ప్రశ్నలకు సమాధానం ఇవ్వడం ప్రారంభించవచ్చు. కాలక్రమేణా మీ యూజర్ బేస్ పెరుగుతోందా? ఉత్పత్తి యొక్క ఏ అంశాలతో ప్రజలు ఎక్కువగా సంకర్షణ చెందుతారు? మరియు ఉండకూడని చోట లోపాలు ఉన్నాయా? సంస్థకు ఆసక్తి కలిగించే ప్రశ్నలు ఇవి. ఈ ప్రశ్నలకు సమాధానాల నుండి వెలువడే అంతర్దృష్టుల ఆధారంగా, మేము ఉత్పత్తిని మెరుగుపరచవచ్చు మరియు వినియోగదారు నిశ్చితార్థాన్ని పెంచవచ్చు.
ఈ రకమైన వ్యాయామానికి బీమ్ నిజంగా ఉపయోగకరంగా ఉంటుంది మరియు అనేక ఇతర ఆసక్తికరమైన ఉపయోగ సందర్భాలను కూడా కలిగి ఉంది. ఉదాహరణకు, మీరు స్టాక్ టిక్ డేటాను నిజ సమయంలో విశ్లేషించి, విశ్లేషణ ఆధారంగా ట్రేడ్లు చేయాలనుకోవచ్చు, బహుశా మీరు వాహనాల నుండి వచ్చే సెన్సార్ డేటాను కలిగి ఉండవచ్చు మరియు ట్రాఫిక్ స్థాయి గణనలను లెక్కించాలనుకోవచ్చు. మీరు, ఉదాహరణకు, వినియోగదారు డేటాను సేకరించి, కీ కొలమానాలను ట్రాక్ చేయడానికి డాష్బోర్డ్లను రూపొందించడానికి దాన్ని ఉపయోగించే గేమింగ్ కంపెనీగా కూడా ఉండవచ్చు. సరే, పెద్దమనుషులు, ఇది మరొక పోస్ట్ కోసం ఒక అంశం, చదివినందుకు ధన్యవాదాలు, మరియు పూర్తి కోడ్ని చూడాలనుకునే వారికి, నా GitHub లింక్ క్రింద ఉంది.
అంతే.
మూలం: www.habr.com