Hi uile. Tha sinn a’ roinn eadar-theangachadh a’ phàirt mu dheireadh den artaigil, a chaidh ullachadh gu sònraichte airson oileanaich a’ chùrsa.
Apache Beam agus DataFlow airson Pìoban Fìor-ùine
A’ stèidheachadh Google Cloud
Nota: Chleachd mi Google Cloud Shell gus an loidhne-phìoban a ruith agus dàta log àbhaisteach fhoillseachadh oir bha duilgheadas agam a bhith a’ ruith na loidhne-phìoban ann am Python 3. Bidh Google Cloud Shell a’ cleachdadh Python 2, a tha nas cunbhalaiche ri Apache Beam.
Gus an loidhne-phìoban a thòiseachadh, feumaidh sinn beagan a chladhach a-steach do na roghainnean. Dhaibhsan agaibhse nach do chleachd GCP roimhe seo, feumaidh tu na 6 ceumannan a leanas a tha air am mìneachadh ann an seo a leantainn
Às deidh seo, feumaidh sinn na sgriobtaichean againn a luchdachadh suas gu Google Cloud Storage agus an lethbhreac gu ar Google Cloud Shel. Tha luchdachadh suas gu stòradh neòil gu math beag (gheibhear tuairisgeul
Figear 2
Tha na h-òrdughan a dh’ fheumas sinn airson na faidhlichean a chopaigeadh agus na leabharlannan riatanach a stàladh air an liostadh gu h-ìosal.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Cruthachadh ar stòr-dàta agus clàr
Aon uair ‘s gu bheil sinn air na ceumannan co-cheangailte ri stèidheachadh a chrìochnachadh, is e an ath rud a dh’ fheumas sinn a dhèanamh a bhith a’ cruthachadh stòr-dàta agus clàr ann am BigQuery. Tha grunn dhòighean ann seo a dhèanamh, ach is e an rud as sìmplidh consol Google Cloud a chleachdadh le bhith a’ cruthachadh stòr-dàta an-toiseach. Faodaidh tu na ceumannan gu h-ìosal a leantainn
Рисунок 3. Схема таблицы
Foillsich dàta log luchd-cleachdaidh
Tha Pub/Sub na phàirt riatanach den loidhne-phìoban againn oir leigidh e le grunn thagraidhean neo-eisimeileach conaltradh le chèile. Gu sònraichte, bidh e ag obair mar eadar-mheadhanair a leigeas leinn teachdaireachdan a chuir agus fhaighinn eadar tagraidhean. Is e a’ chiad rud a dh’fheumas sinn a dhèanamh cuspair a chruthachadh. Dìreach rachaibh gu Pub/Sub sa chonsail agus cliog air CREATE TOPIC.
Bidh an còd gu h-ìosal a’ gairm ar sgriobt gus an dàta log a tha air a mhìneachadh gu h-àrd a ghineadh agus an uairsin a’ ceangal agus a ’cur na logaichean gu Pub / Sub. Is e an aon rud a dh'fheumas sinn a dhèanamh rud a chruthachadh FoillsichearClient, sònraich an t-slighe chun chuspair a’ cleachdadh an dòigh topic_path
agus gairm an gnìomh publish
с topic_path
agus dàta. Thoir an aire gu bheil sinn a’ toirt a-steach generate_log_line
o'n sgriobtuir stream_logs
, mar sin dèan cinnteach gu bheil na faidhlichean sin san aon phasgan, air neo gheibh thu mearachd in-mhalairt. Faodaidh sinn an uairsin seo a ruith tron chonsal google againn a’ cleachdadh:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Cho luath ‘s a bhios am faidhle a’ ruith, bidh e comasach dhuinn toradh an dàta log fhaicinn chun consol, mar a chithear san fhigear gu h-ìosal. Obraichidh an sgriobt seo cho fad 's nach cleachd sinn CTRL + Cgus a chrìochnachadh.
Figear 4. Toradh publish_logs.py
Sgrìobhadh ar còd loidhne-phìoban
A-nis gu bheil a h-uile càil air ullachadh, is urrainn dhuinn am pàirt spòrsail a thòiseachadh - a’ còdadh ar loidhne-phìoban a’ cleachdadh Beam agus Python. Gus loidhne-phìoban Beam a chruthachadh, feumaidh sinn rud loidhne-phìoban a chruthachadh (p). Aon uair ‘s gu bheil sinn air rud loidhne-phìoban a chruthachadh, is urrainn dhuinn grunn ghnìomhan a chuir an sàs aon às deidh a chèile a’ cleachdadh a ’ghnìomhaiche pipe (|)
. San fharsaingeachd, tha an sruth-obrach coltach ris an ìomhaigh gu h-ìosal.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Anns a’ chòd againn, cruthaichidh sinn dà ghnìomh àbhaisteach. Gnìomh regex_clean
, a bhios a’ sganadh an dàta agus a’ faighinn air ais an t-sreath fhreagarrach stèidhichte air liosta PATTERNS a’ cleachdadh a’ ghnìomh re.search
. Bidh an gnìomh a’ tilleadh sreang sgaraichte le cromag. Mura h-eil thu nad eòlaiche faireachdainn cunbhalach, tha mi a’ moladh sùil a thoirt air seo datetime
taobh a-staigh gnìomh gus toirt air obrachadh. Bha mi a’ faighinn mearachd in-mhalairt aig toiseach an fhaidhle, rud a bha neònach. Thèid an liosta seo an uairsin a thoirt don ghnìomh WriteToBigQuery, a tha dìreach a’ cur ar dàta ris a’ chlàr. Tha an còd airson Batch DataFlow Job agus Streaming DataFlow Job air a thoirt seachad gu h-ìosal. Is e an aon eadar-dhealachadh eadar baidse agus còd sruthadh gun leugh sinn an CSV ann am baidse src_path
a’ cleachdadh a’ ghnìomh ReadFromText
bho Beam.
Iob Batch DataFlow (giollachd baidse)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Sruth DataFlow Job (giollachd sruthan)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
A 'tòiseachadh air a' ghiùlan
Faodaidh sinn an loidhne-phìoban a ruith ann an grunn dhòighean eadar-dhealaichte. Nam biodh sinn ag iarraidh, b’ urrainn dhuinn a ruith gu h-ionadail bho cheann-uidhe fhad ‘s a bha sinn a’ logadh a-steach gu GCP air astar.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Ach, tha sinn gu bhith ga ruith a’ cleachdadh DataFlow. Is urrainn dhuinn seo a dhèanamh leis an àithne gu h-ìosal le bhith a’ suidheachadh nam paramadairean riatanach a leanas.
project
- ID den phròiseact GCP agad.runner
na ruitheadair loidhne-phìoban a nì sgrùdadh air a’ phrògram agad agus a thogas do loidhne-phìoban. Gus ruith san sgòth, feumaidh tu DataflowRunner a shònrachadh.staging_location
- an t-slighe gu stòradh sgòthan Cloud Dataflow airson pasganan còd clàr-amais a dh ’fheumas na pròiseasairean a tha a’ coileanadh na h-obrach.temp_location
- slighe gu stòradh neòil Cloud Dataflow airson faidhlichean obrach sealach a stòradh fhad ‘s a tha an loidhne-phìoban a’ ruith.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Fhad ‘s a tha an àithne seo a’ ruith, is urrainn dhuinn a dhol chun taba DataFlow ann an consol google agus coimhead air an loidhne-phìoban againn. Nuair a phutas sinn air an loidhne-phìoban, bu chòir dhuinn rudeigin coltach ri Figear 4 fhaicinn. Airson adhbharan debugging, faodaidh e a bhith gu math cuideachail a dhol gu Logaichean agus an uairsin gu Stackdriver gus na logaichean mionaideach fhaicinn. Tha seo air mo chuideachadh le bhith a’ fuasgladh chùisean loidhne-phìoban ann an grunn chùisean.
Figear 4: Beam ghiùladair
Faigh cothrom air an dàta againn ann am BigQuery
Mar sin, bu chòir loidhne-phìoban a bhith againn mu thràth le dàta a’ sruthadh a-steach don bhòrd againn. Gus seo a dhearbhadh, is urrainn dhuinn a dhol gu BigQuery agus coimhead air an dàta. Às deidh dhut an àithne gu h-ìosal a chleachdadh bu chòir dhut a’ chiad beagan shreathan den stòr-dàta fhaicinn. A-nis gu bheil an dàta againn air a stòradh ann am BigQuery, is urrainn dhuinn tuilleadh sgrùdaidh a dhèanamh, a bharrachd air an dàta a cho-roinn le co-obraichean agus tòiseachadh air ceistean gnìomhachais a fhreagairt.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Figear 5: BigQuery
co-dhùnadh
Tha sinn an dòchas gum bi an dreuchd seo na eisimpleir feumail de bhith a’ cruthachadh loidhne-phìoban dàta sruthadh, a bharrachd air dòighean a lorg gus dàta a dhèanamh nas ruigsinneach. Bheir stòradh dàta sa chruth seo mòran bhuannachdan dhuinn. A-nis is urrainn dhuinn tòiseachadh air ceistean cudromach a fhreagairt mar cia mheud duine a bhios a’ cleachdadh ar toradh? A bheil do bhunait luchd-cleachdaidh a’ fàs thar ùine? Dè na taobhan den toradh a bhios daoine ag eadar-obrachadh leis as motha? Agus a bheil mearachdan ann far nach bu chòir a bhith ann? Is iad seo na ceistean a bhios inntinneach don bhuidheann. Stèidhichte air na beachdan a tha a’ nochdadh bho fhreagairtean nan ceistean sin, is urrainn dhuinn an toradh adhartachadh agus conaltradh luchd-cleachdaidh a mheudachadh.
Tha beam air leth feumail airson an seòrsa eacarsaich seo agus tha grunn chùisean cleachdaidh inntinneach eile ann cuideachd. Mar eisimpleir, is dòcha gum bi thu airson mion-sgrùdadh a dhèanamh air dàta strìochag stoc ann an àm fìor agus ciùird a dhèanamh stèidhichte air an anailis, is dòcha gu bheil dàta mothachaidh agad a’ tighinn bho charbadan agus gu bheil thu airson àireamhachadh ìre trafaic obrachadh a-mach. Dh’ fhaodadh tu cuideachd, mar eisimpleir, a bhith nad chompanaidh gèam a bhios a’ cruinneachadh dàta luchd-cleachdaidh agus ga chleachdadh gus deas-bhòrdan a chruthachadh gus sùil a chumail air prìomh mheatrics. Ceart gu leòr, a dhaoine uaisle, is e cuspair a tha seo airson post eile, taing airson leughadh, agus dhaibhsan a tha airson an còd slàn fhaicinn, gu h-ìosal tha an ceangal gu mo GitHub.
Tha sin uile.
Source: www.habr.com