Kumusta tanan. Among gipaambit ang hubad sa kataposang bahin sa artikulo, nga espesipikong giandam alang sa mga estudyante sa kurso.
Apache Beam ug DataFlow alang sa Real-Time nga mga Pipeline
Pag-set up sa Google Cloud
Mubo nga sulat: Gigamit nako ang Google Cloud Shell sa pagpadagan sa pipeline ug pagmantala sa custom log data tungod kay naglisud ko sa pagpadagan sa pipeline sa Python 3. Ang Google Cloud Shell naggamit sa Python 2, nga mas nahiuyon sa Apache Beam.
Aron masugdan ang pipeline, kinahanglan namon nga magkalot og gamay sa mga setting. Para nimo nga wala pa mogamit sa GCP kaniadto, kinahanglan nimong sundon ang mosunod nga 6 nga mga lakang nga gilatid niini
Pagkahuman niini, kinahanglan namong i-upload ang among mga script sa Google Cloud Storage ug kopyahon kini sa among Google Cloud Shel. Ang pag-upload sa cloud storage kay gamay ra (usa ka paghulagway ang makit-an
Hulagway 2
Ang mga sugo nga kinahanglan natong kopyahon ang mga file ug i-install ang gikinahanglan nga mga librarya gilista sa ubos.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Paghimo sa among database ug lamesa
Kung nahuman na namo ang tanan nga mga lakang nga may kalabutan sa pag-setup, ang sunod nga kinahanglan namon buhaton mao ang paghimo usa ka dataset ug lamesa sa BigQuery. Adunay ubay-ubay nga mga paagi sa pagbuhat niini, apan ang pinakasimple mao ang paggamit sa Google Cloud console pinaagi sa paghimo una og dataset. Mahimo nimong sundon ang mga lakang sa ubos
Figure 3. Layout sa lamesa
Pagmantala sa datos sa log sa gumagamit
Ang Pub/Sub usa ka kritikal nga bahin sa among pipeline tungod kay gitugotan niini ang daghang mga independente nga aplikasyon nga makigkomunikar sa usag usa. Sa partikular, kini naglihok isip tigpataliwala nga nagtugot kanamo sa pagpadala ug pagdawat sa mga mensahe tali sa mga aplikasyon. Ang unang butang nga kinahanglan natong buhaton mao ang paghimo og usa ka hilisgutan. Adto lang sa Pub/Sub sa console ug i-klik PAGHIMO TOPIC.
Ang code sa ubos nagtawag sa among script aron makamugna ang data sa log nga gihubit sa ibabaw ug dayon nagkonektar ug nagpadala sa mga troso sa Pub/Sub. Ang kinahanglan ra natong buhaton mao ang paghimo og butang Kliyente sa Publisher, ipiho ang dalan padulong sa hilisgutan gamit ang pamaagi topic_path
ug tawagan ang function publish
Ρ topic_path
ug data. Palihug timan-i nga kami nag-import generate_log_line
gikan sa among script stream_logs
, busa siguruha nga kini nga mga file naa sa parehas nga folder, kung dili makakuha ka usa ka sayup sa pag-import. Mahimo namong ipadagan kini pinaagi sa among google console gamit ang:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Sa diha nga ang file modagan, atong makita ang output sa log data ngadto sa console, sama sa gipakita sa hulagway sa ubos. Kini nga script molihok basta dili kami mogamit CTRL + Caron makompleto kini.
Hulagway 4. Output publish_logs.py
Pagsulat sa among pipeline code
Karon nga naandam na namo ang tanan, makasugod na kami sa makalingaw nga bahin - pag-coding sa among pipeline gamit ang Beam ug Python. Aron makamugna ug Beam pipeline, kinahanglang maghimo kita ug pipeline object (p). Sa higayon nga nakahimo na kami og pipeline nga butang, mahimo namong magamit ang daghang mga function sa usag usa gamit ang operator pipe (|)
. Sa kinatibuk-an, ang workflow sama sa imahe sa ubos.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Sa among code, maghimo kami og duha ka custom functions. Kalihokan regex_clean
, nga nag-scan sa datos ug nagkuha sa katugbang nga laray base sa lista sa PATTERNS gamit ang function re.search
. Ang function nagbalik sa usa ka comma separated string. Kung dili ka usa ka eksperto sa regular nga ekspresyon, girekomenda nako nga susihon kini datetime
sulod sa usa ka function aron kini molihok. Nakakuha ako usa ka sayup sa pag-import sa sinugdanan sa file, nga katingad-an. Kini nga lista ipasa sa function WriteToBigQuery, nga nagdugang lang sa among datos sa lamesa. Ang code alang sa Batch DataFlow Job ug Streaming DataFlow Job gihatag sa ubos. Ang bugtong kalainan tali sa batch ug streaming code mao nga sa batch atong gibasa ang CSV gikan sa src_path
gamit ang function ReadFromText
gikan sa Beam.
Batch DataFlow Job (pagproseso sa batch)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Pag-stream sa DataFlow Job (pagproseso sa sapa)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Pagsugod sa conveyor
Mahimo namon nga mapadagan ang pipeline sa lainlaing mga paagi. Kung gusto namon, mahimo ra namon kini nga ipadagan sa lokal gikan sa usa ka terminal samtang nag-log in sa GCP sa layo.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Bisan pa, ipadagan namon kini gamit ang DataFlow. Mahimo nato kini gamit ang ubos nga sugo pinaagi sa pagtakda sa mosunod nga gikinahanglan nga mga parameter.
project
β ID sa imong proyekto sa GCP.runner
usa ka pipeline runner nga mag-analisar sa imong programa ug magtukod sa imong pipeline. Aron makadagan sa panganod, kinahanglan nimong itakda ang usa ka DataflowRunner.staging_location
β ang agianan padulong sa Cloud Dataflow cloud storage alang sa pag-index sa mga pakete sa code nga gikinahanglan sa mga processor nga nagpahigayon sa trabaho.temp_location
β agianan padulong sa Cloud Dataflow cloud storage para sa pagtipig sa temporaryo nga mga file sa trabaho nga gihimo samtang ang pipeline nagdagan.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Samtang nagdagan kini nga sugo, makaadto kami sa tab nga DataFlow sa google console ug tan-awon ang among pipeline. Kung mag-klik kami sa pipeline, kinahanglan namon nga makita ang usa ka butang nga susama sa Figure 4. Alang sa mga katuyoan sa pag-debug, makatabang kaayo nga moadto sa Logs ug dayon sa Stackdriver aron makita ang detalyado nga mga troso. Nakatabang kini kanako nga masulbad ang mga isyu sa pipeline sa daghang mga kaso.
Hulagway 4: Beam conveyor
I-access ang among data sa BigQuery
Busa, kinahanglan na kita nga adunay usa ka pipeline nga nagdagan nga adunay mga datos nga nagdagayday sa atong lamesa. Aron sulayan kini, makaadto kami sa BigQuery ug tan-awon ang datos. Human sa paggamit sa sugo sa ubos kinahanglan nimo nga makita ang unang pipila ka laray sa dataset. Karon nga naa na namo ang data nga gitipigan sa BigQuery, mahimo namong ipahigayon ang dugang nga pagtuki, ingon man ipaambit ang datos sa mga kauban ug magsugod sa pagtubag sa mga pangutana sa negosyo.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Hulagway 5: BigQuery
konklusyon
Kami nanghinaut nga kini nga post magsilbi nga usa ka mapuslanon nga panig-ingnan sa paghimo sa usa ka streaming data pipeline, ingon man usab sa pagpangita og mga paagi aron mahimo ang datos nga mas dali ma-access. Ang pagtipig sa datos sa kini nga format naghatag kanamo daghang mga bentaha. Karon makasugod na ta sa pagtubag sa importanteng mga pangutana sama sa pila ka tawo ang naggamit sa atong produkto? Nagtubo ba ang imong base sa tiggamit sa paglabay sa panahon? Unsa nga mga aspeto sa produkto ang labing nakig-uban sa mga tawo? Ug naa bay mga sayup nga wala kinahanglana? Kini ang mga pangutana nga makapainteres sa organisasyon. Pinasukad sa mga panabut nga mitumaw gikan sa mga tubag sa kini nga mga pangutana, mahimo namon nga mapaayo ang produkto ug madugangan ang pakiglambigit sa gumagamit.
Ang Beam mapuslanon kaayo alang sa kini nga klase sa ehersisyo ug adunay daghang uban pang makapaikag nga mga kaso sa paggamit usab. Pananglitan, mahimo nimong analisahon ang datos sa stock tick sa tinuud nga oras ug maghimo mga patigayon base sa pag-analisar, tingali adunay ka data sa sensor nga gikan sa mga salakyanan ug gusto nimo kuwentahon ang mga kalkulasyon sa lebel sa trapiko. Mahimo ka usab, pananglitan, usa ka kompanya sa dula nga nagkolekta sa datos sa gumagamit ug gigamit kini aron maghimo mga dashboard aron masubay ang mga hinungdan nga sukatan. Okay, mga ginoo, kini usa ka hilisgutan alang sa lain nga post, salamat sa pagbasa, ug alang sa gusto nga makita ang tibuuk nga code, sa ubos ang link sa akong GitHub.
Mao ra na.
Source: www.habr.com