Mhoroi mose. Isu tiri kugovera shandurudzo yechikamu chekupedzisira chechinyorwa, chakanyatsogadzirirwa vadzidzi vekosi.
Apache Beam uye DataFlow yeChaiyo-Nguva Pipelines
Kuseta Google Cloud
Cherechedza: Ndakashandisa Google Cloud Shell kufambisa pombi uye kushambadza data regi nekuti ndanga ndiine dambudziko rekushandisa pombi muPython 3. Google Cloud Shell inoshandisa Python 2, iyo inonyanya kuenderana neApache Beam.
Kutanga pombi, tinoda kuchera zvishoma muzvirongwa. Kune avo venyu vasati vamboshandisa GCP, muchafanira kutevedzera nhanho nhanhatu dzinotevera dzakatsanangurwa mune izvi
Mushure meizvi, isu tichada kurodha zvinyorwa zvedu kuGoogle Cloud Storage tozvikopa kuGoogle Cloud Shel yedu. Kurodha kune kuchengetwa kwegore kudiki kwazvo (tsanangudzo inogona kuwanikwa
Mufananidzo 2
Mirairo yatinoda kukopa mafaera uye kuisa maraibhurari anodiwa akanyorwa pazasi.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Kugadzira database yedu uye tafura
Kana tapedza matanho ese ane chekuita nekuseta, chinhu chinotevera chatinofanira kuita kugadzira dataset uye tafura muBigQuery. Pane nzira dzinoverengeka dzekuita izvi, asi iri nyore kushandisa Google Cloud console nekutanga kugadzira dataset. Unogona kutevera matanho ari pasi apa
Mufananidzo 3. Tafura yekugadzira
Kuburitsa data regi yevashandisi
Pub/Sub chinhu chakakosha chepombi yedu nekuti inobvumira akawanda akazvimirira maapplication kutaurirana. Kunyanya, inoshanda semurevereri inotibvumira kutumira uye kugamuchira mameseji pakati pezvikumbiro. Chinhu chekutanga chatinofanira kuita kugadzira musoro. Ingoenda kuPub/Sub mune koni uye tinya CREATE TOPIC.
Iyo kodhi iri pazasi inodaidza script yedu kuti igadzire iyo log data inotsanangurwa pamusoro uye yobva yabatanidza nekutumira matanda kuPub/Sub. Chinhu chimwe chete chatinofanira kuita kugadzira chinhu PublisherClient, tsanangura nzira yemusoro uchishandisa nzira topic_path
uye kudana basa publish
Ρ topic_path
uye data. Ndapota cherechedza kuti tinopinza kunze generate_log_line
kubva pane yedu script stream_logs
, saka ita shuwa kuti mafaera aya ari muforodha imwe chete, ukasadaro uchawana kukanganisa kupinza. Isu tinokwanisa kumhanyisa izvi kuburikidza neyedu google console tichishandisa:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Kana iyo faira ichingomhanya, isu tichakwanisa kuona kubuda kweiyo data data kune console, sezvakaratidzwa mumufananidzo uri pazasi. Ichi chinyorwa chichashanda chero isu tisingashandisi CTRL + Ckuipedzisa.
Mufananidzo 4. Kubuda publish_logs.py
Kunyora pombi yedu kodhi
Zvino zvatave nese zvakagadzirirwa, tinogona kutanga chikamu chinonakidza - kukodha pombi yedu tichishandisa Beam nePython. Kugadzira pombi yeBeam, tinoda kugadzira chinhu chepombi (p). Kana tangogadzira chinhu chepombi, tinogona kushandisa akawanda mabasa rimwe mushure memumwe tichishandisa mushandisi pipe (|)
. Kazhinji, kufamba kwebasa kunoratidzika semufananidzo uri pasi apa.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Mune kodhi yedu, isu tichagadzira maviri etsika mabasa. Function regex_clean
, iyo inoongorora data uye inotora mutsara unoenderana zvichienderana nePATTERNS list uchishandisa basa re.search
. Basa rinodzosa tambo yakapatsanurwa koma. Kana iwe usiri wenguva dzose kutaura nyanzvi, ini ndinokurudzira kutarisa izvi datetime
mukati mebasa kuti riite kushanda. Ndakanga ndichiwana kukanganisa kwekutumira pakutanga kwefaira, izvo zvaive zvisinganzwisisike. Rondedzero iyi inobva yaendeswa kune basa WriteToBigQuery, iyo inongowedzera data yedu patafura. Iyo kodhi yeBatch DataFlow Job uye Yekufambisa DataFlow Job inopiwa pazasi. Musiyano chete pakati pebatch uye yekufambisa kodhi ndeyekuti mubatch tinoverenga CSV kubva src_path
kushandisa basa ReadFromText
kubva kuBeam.
Batch DataFlow Job (batch processing)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Kutenderera DataFlow Basa (rukova kugadzirisa)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Kutanga conveyor
Tinogona kumhanyisa pombi munzira dzinoverengeka dzakasiyana. Kana taida, taigona kungoimhanyisa munharaunda kubva kune terminal tichipinda muGCP kure.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Nekudaro, isu tichaimhanyisa tichishandisa DataFlow. Tinogona kuita izvi tichishandisa murairo uri pasi apa nekuisa zvinotevera zvinodiwa parameters.
project
- ID yepurojekiti yako yeGCP.runner
mumhanyi wepombi inoongorora chirongwa chako uye kuvaka pombi yako. Kuti umhanye mugore, unofanira kutsanangura DataflowRunner.staging_location
- iyo nzira inoenda kuCloud Dataflow Cloud kuchengetedza ye indexing kodhi mapakeji anodiwa nema processor ari kuita basa.temp_location
- nzira inoenda kuCloud Dataflow Cloud kuchengetedza kuchengetedza mafaira ebasa kwenguva pfupi akagadzirwa nepo pombi iri kushanda.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Ipo uyu murairo uchishanda, tinogona kuenda kuDataFlow tab mugoogle console toona pombi yedu. Patinobaya pombi, tinofanira kuona chimwe chinhu chakafanana neChifananidzo 4. Nekuda kwekugadzirisa, zvinogona kubatsira zvikuru kuenda kuLogs uyezve kuStackdriver kuti uone matanda akadzama. Izvi zvakandibatsira kugadzirisa nyaya dzepombi mune akati wandei.
Mufananidzo 4: Beam conveyor
Svika data redu muBigQuery
Saka, isu tinofanira kunge tatova nepombi inomhanya ine data inoyerera mutafura yedu. Kuti uedze izvi, tinogona kuenda kuBigQuery totarisa iyo data. Mushure mekushandisa rairo pazasi iwe unofanirwa kuona mitsetse yekutanga yedhata. Iye zvino zvatine data rakachengetwa muBigQuery, tinogona kuitisa kumwe kuongorora, pamwe nekugovana data nevatinoshanda navo uye kutanga kupindura mibvunzo yebhizinesi.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Mufananidzo 5: BigQuery
mhedziso
Tinovimba kuti chinyorwa ichi chinoshanda semuenzaniso unobatsira wekugadzira pombi yekufambisa data, pamwe nekutsvaga nzira dzekuita kuti data iwanikwe. Kuchengeta data mune iyi fomati kunotipa akawanda mabhenefiti. Iye zvino tinogona kutanga kupindura mibvunzo yakakosha sekuti vangani vanhu vanoshandisa chigadzirwa chedu? Mushandisi wako ari kukura nekufamba kwenguva here? Ndezvipi zvikamu zvechigadzirwa izvo vanhu vanonyanya kusangana nazvo? Uye pane zvikanganiso here zvisingafanirwe kuve? Iyi ndiyo mibvunzo ichafarirwa nesangano. Zvichienderana neruzivo rwunobuda mumhinduro dzemibvunzo iyi, tinogona kuvandudza chigadzirwa uye kuwedzera kubatanidzwa kwevashandisi.
Beam inobatsira chaizvo kune iyi mhando yekurovedza muviri uye ine akati wandei mamwe anonakidza makesi ekushandisa zvakare. Semuenzaniso, iwe ungangoda kuongorora stock tick data munguva chaiyo uye kuita kutengeserana zvichienderana nekuongorora, pamwe une sensor data inouya kubva kumotokari uye unoda kuverenga traffic level kuverenga. Iwe unogona zvakare, semuenzaniso, kuve kambani yemitambo inounganidza data revashandisi uye inoishandisa kugadzira madhibhodhi ekutevera akakosha metrics. Zvakanaka, varume, iyi inyaya yeimwe positi, ndatenda nekuverenga, uye kune avo vanoda kuona iyo yakazara kodhi, pazasi ndiyo link kune yangu GitHub.
Ndizvozvo chete.
Source: www.habr.com