Salama daholo. Mizara ny fandikana ny tapany farany amin'ny lahatsoratra izahay, izay nomanina manokana ho an'ny mpianatry ny taranja.
Apache Beam sy DataFlow ho an'ny fantsona tena fotoana
Fametrahana Google Cloud
Fanamarihana: Nampiasa Google Cloud Shell aho mba hampandehanana ny fantsona ary hamoaka ny angon-drakitra manokana satria sahirana amin'ny fampandehanana ny fantsona amin'ny Python 3. Google Cloud Shell dia mampiasa Python 2, izay mifanaraka kokoa amin'ny Apache Beam.
Mba hanombohana ny fantsona dia mila mihady kely amin'ny toe-javatra isika. Ho anao izay mbola tsy nampiasa GCP teo aloha, dia mila manaraka ireto dingana 6 manaraka ireto ianao
Aorian'izany dia mila mampakatra ny scripty any amin'ny Google Cloud Storage izahay ary mandika azy ireo any amin'ny Google Cloud Shel. Ny fampiakarana amin'ny fitahirizana rahona dia tsy dia misy dikany loatra (misy famaritana
2 Figure
Ny baiko ilaintsika handikana ireo rakitra sy fametrahana ireo tranomboky ilaina dia voatanisa etsy ambany.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Mamorona ny angonay sy ny latabatra
Rehefa vita ny dingana rehetra mifandraika amin'ny fananganana, ny zavatra manaraka tokony hataontsika dia ny mamorona angona sy latabatra ao amin'ny BigQuery. Misy fomba maro hanaovana izany, fa ny tsotra indrindra dia ny fampiasana ny Google Cloud Console amin'ny famoronana daty voalohany. Azonao atao ny manaraka ny dingana etsy ambany
Sary 3. Fandrafetana latabatra
Famoahana ny angon'ny diarin'ny mpampiasa
Ny Pub/Sub dia singa manan-danja amin'ny fantsonay satria mamela fampiharana tsy miankina maro hifampiresaka. Indrindra indrindra, miasa toy ny mpanelanelana ahafahantsika mandefa sy mandray hafatra eo anelanelan'ny fampiharana. Ny zavatra voalohany tokony hataontsika dia ny mamorona lohahevitra iray. Mandehana fotsiny amin'ny Pub/Sub ao amin'ny console ary tsindrio CREATE TOPIC.
Ny kaody etsy ambany dia miantso ny scripty mba hamoronana ny angon-drakitra voalaza etsy ambony ary avy eo mampifandray sy mandefa ny logs amin'ny Pub/Sub. Ny hany tokony hataontsika dia ny mamorona zavatra iray PublisherClient, mamaritra ny lalana mankany amin'ny lohahevitra mampiasa ny fomba topic_path
ary miantso ny asa publish
Ρ topic_path
ary data. Marihina fa manafatra izahay generate_log_line
avy amin'ny scripty stream_logs
, koa ataovy azo antoka fa ao amin'ny lahatahiry iray ihany ireo rakitra ireo, raha tsy izany dia hahazo hadisoana fanafarana ianao. Azontsika atao izany amin'ny alΓ lan'ny console google amin'ny fampiasana:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Raha vao mandeha ny rakitra dia ho hitantsika ny fivoahan'ny angon-drakitra mankany amin'ny console, araka ny aseho amin'ny sary etsy ambany. Ity script ity dia hiasa raha mbola tsy ampiasainay Ctrl + Chamita azy.
Sary 4. Output publish_logs.py
Manoratra ny kaodin-tsofinay
Efa voaomanantsika izao ny zava-drehetra, afaka manomboka ny ampahany mahafinaritra isika - fametahana ny fantsona amin'ny alΓ lan'ny Beam sy Python. Mba hamoronana fantsona Beam dia mila mamorona zavatra fantsona (p). Raha vao namorona zavatra pipeline isika, dia afaka mampihatra asa maro isan-karazany amin'ny fampiasana ny operator pipe (|)
. Amin'ny ankapobeny dia mitovy amin'ny sary etsy ambany ny fizotran'ny asa.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Amin'ny kaody ataontsika dia hamorona fiasa roa mahazatra isika. asa regex_clean
, izay mijery ny angon-drakitra ary maka ny laharana mifanaraka amin'izany mifototra amin'ny lisitry ny PATTERNS mampiasa ny fiasa re.search
. Ny fiasa dia mamerina tady misaraka faingo. Raha tsy manam-pahaizana manokana momba ny fanehoan-kevitra mahazatra ianao dia manoro hevitra aho hanamarina ity datetime
ao anatin'ny fiasa iray mba hampandeha azy. Nahazo fahadisoana fanafarana aho tany am-piandohan'ny rakitra, izay hafahafa. Ity lisitra ity dia alefa any amin'ny asa WriteToBigQuery, izay manampy fotsiny ny angonay amin'ny latabatra. Ny kaody ho an'ny Batch DataFlow Job sy ny Streaming DataFlow Job dia omena etsy ambany. Ny hany maha samy hafa ny batch sy ny streaming code dia ny batch mamaky ny CSV avy src_path
mampiasa ny asa ReadFromText
avy amin'ny Beam.
Batch DataFlow Job (fanodinana andiany)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Streaming DataFlow Job (fikarakarana ny stream)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Manomboka ny conveyor
Afaka mampandeha ny pipeline amin'ny fomba maro samihafa isika. Raha tiana dia azonay atao ny mampandeha azy eo an-toerana avy amin'ny terminal iray rehefa miditra amin'ny GCP lavitra.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Na izany aza, hampiasa izany amin'ny DataFlow izahay. Azontsika atao izany amin'ny alΓ lan'ny baiko etsy ambany amin'ny alΓ lan'ny fametrahana ireto fepetra takiana manaraka ireto.
project
- ID ny tetikasa GCP-nao.runner
dia mpihazakazaka fantsona izay hamakafaka ny programanao sy hanorina ny fantsonao. Raha te hihazakazaka amin'ny rahona ianao dia tsy maintsy mamaritra DataflowRunner.staging_location
β ny lalana mankany amin'ny fitehirizana rahona Cloud Dataflow ho an'ny fonosana kaody fanondroana ilain'ireo mpanodina manao ny asa.temp_location
- lalana mankany amin'ny fitehirizana rahona Cloud Dataflow ho fitehirizana ireo rakitra asa vonjimaika noforonina rehefa mandeha ny fantsona.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Raha mandeha io baiko io dia afaka mandeha any amin'ny tabilao DataFlow ao amin'ny console google isika ary mijery ny fantsonay. Rehefa manindry ny fantsona isika, dia tokony hahita zavatra mitovy amin'ny sary 4. Ho an'ny tanjona debug, dia tena manampy tokoa ny mandeha any amin'ny Logs ary avy eo mankany amin'ny Stackdriver raha hijery ireo logs amin'ny antsipiriany. Izany dia nanampy ahy hamaha ny olan'ny fantsona amin'ny tranga maromaro.
Sary 4: Beam conveyor
Midira ao amin'ny BigQuery ny angonay
Noho izany, tokony efa manana fantsona mandeha miaraka amin'ny angona mikoriana ao amin'ny latabatray isika. Mba hitsapana izany dia afaka mandeha any amin'ny BigQuery isika ary mijery ny angona. Rehefa avy mampiasa ny baiko etsy ambany ianao dia tokony hahita andalana vitsivitsy voalohany amin'ny angona. Rehefa manana ny angona voatahiry ao amin'ny BigQuery izahay izao, dia afaka manao famakafakana bebe kokoa, ary mizara ny angona amin'ny mpiara-miasa ary manomboka mamaly fanontaniana momba ny raharaham-barotra.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Sary 5: BigQuery
famaranana
Manantena izahay fa ity lahatsoratra ity dia ho ohatra mahasoa amin'ny famoronana fantsona angon-drakitra mivantana, ary koa fitadiavana fomba hahatonga ny angona ho mora idirana kokoa. Ny fitehirizana angona amin'ity endrika ity dia manome tombony maro ho antsika. Ankehitriny isika dia afaka manomboka mamaly fanontaniana manan-danja toy ny firy ny olona mampiasa ny vokatra? Mitombo ve ny mpampiasa anao rehefa mandeha ny fotoana? Inona avy ireo lafiny amin'ny vokatra ifandrimbonan'ny olona indrindra? Ary misy lesoka tsy tokony hisy ve? Ireo no fanontaniana hahaliana ny fikambanana. Miorina amin'ny fomba fijery mipoitra avy amin'ny valin'ireo fanontaniana ireo, afaka manatsara ny vokatra isika ary mampitombo ny fidiran'ny mpampiasa.
Beam dia tena ilaina amin'ity karazana fanazaran-tena ity ary manana tranga fampiasana mahaliana hafa koa. Ohatra, azonao atao ny manadihady ny angon-drakitra momba ny tahiry amin'ny fotoana tena izy ary manao varotra mifototra amin'ny famakafakana, angamba manana angona sensor avy amin'ny fiara ianao ary te hanao kajy ny haavon'ny fifamoivoizana. Azonao atao ihany koa, ohatra, ny ho orinasa lalao manangona angon-drakitra mpampiasa ary mampiasa azy io hamoronana dashboard hanaraha-maso ireo metrika fototra. Okay, tompokolahy, lohahevitra ho an'ny lahatsoratra hafa ity, misaotra amin'ny famakiana, ary ho an'ireo izay te-hahita ny kaody feno, eto ambany ny rohy mankany amin'ny GitHub-ko.
Izay ihany.
Source: www.habr.com