Sanibonani nonke. Sabelana ngokuhumusha kwengxenye yokugcina ye-athikili, elungiselelwe ngokukhethekile abafundi besifundo.
I-Apache Beam ne-DataFlow yamapayipi esikhathi sangempela
Isetha i-Google Cloud
Qaphela: Ngisebenzise i-Google Cloud Shell ukuze ngiqalise umzila futhi ngishicilele idatha yelogi yangokwezifiso ngoba benginenkinga yokusebenzisa ipayipi ku-Python 3. I-Google Cloud Shell isebenzisa i-Python 2, evumelana kakhulu ne-Apache Beam.
Ukuze siqale ipayipi, sidinga ukumba kancane kuzilungiselelo. Kulabo kini abangakaze basebenzise i-GCP ngaphambili, kuzodingeka ukuthi ulandele izinyathelo eziyisi-6 ezilandelayo ezichazwe kulokhu
Ngemva kwalokhu, sizodinga ukulayisha izikripthi zethu ku-Google Cloud Storage futhi sizikopishe ku-Google Cloud Shel yethu. Ukulayisha kusitoreji samafu kuyinto encane kakhulu (incazelo ingatholakala
Umdwebo we-2
Imiyalo esiyidingayo ukuze sikopishe amafayela futhi sifake imitapo yolwazi edingekayo ibhalwe ngezansi.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Ukwakha isizindalwazi sethu kanye nethebula
Uma sesiqedele zonke izinyathelo ezihlobene nokusetha, into elandelayo okudingeka siyenze ukwakha idathasethi kanye nethebula ku-BigQuery. Kunezindlela ezimbalwa zokwenza lokhu, kodwa okulula kakhulu ukusebenzisa ikhonsoli ye-Google Cloud ngokudala isethi yedatha kuqala. Ungakwazi ukulandela izinyathelo ezingezansi
Umfanekiso 3. Isakhiwo sethebula
Ukushicilela idatha yelogi yomsebenzisi
I-Pub/Sub iyingxenye ebalulekile yepayipi lethu ngoba ivumela izinhlelo zokusebenza eziningi ezizimele ukuthi zixhumane. Ikakhulukazi, isebenza njengomxhumanisi esivumela ukuthi sithumele futhi samukele imilayezo phakathi kwezinhlelo zokusebenza. Into yokuqala okudingeka siyenze ukwakha isihloko. Vele uye ku-Pub/Sub kukhonsoli bese uchofoza okuthi DALA ISIHLOKO.
Ikhodi engezansi ibiza iskripthi sethu ukuze sikhiqize idatha yelogi echazwe ngenhla bese ixhuma futhi ithumele amalogi ku-Pub/Sub. Into kuphela okudingeka siyenze ukudala into UmshicileliClient, cacisa indlela eya esihlokweni usebenzisa indlela topic_path
bese ubiza umsebenzi publish
Ρ topic_path
kanye nedatha. Sicela uqaphele ukuthi singenisa generate_log_line
kusuka kusikripthi sethu stream_logs
, ngakho qiniseka ukuthi lawa mafayela akufolda efanayo, ngaphandle kwalokho uzothola iphutha lokungenisa. Ngemuva kwalokho singasebenzisa lokhu nge-google console yethu sisebenzisa:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Ngokushesha lapho ifayela liqalisa, sizokwazi ukubona okukhipha idatha yelogi kukhonsoli, njengoba kuboniswe esithombeni esingezansi. Lesi script sizosebenza inqobo nje uma singasisebenzisi I-CTRL + Cukuyiqeda.
Umfanekiso 4. Okukhiphayo publish_logs.py
Ukubhala ikhodi yethu yepayipi
Manje njengoba sesizilungisile zonke izinto, sesingaqala ingxenye ejabulisayo - ukubhala ngekhodi iphayiphi lethu sisebenzisa iBeam nePython. Ukuze sakhe ipayipi le-Beam, sidinga ukudala into yepayipi (p). Uma sesidale into yepayipi, singasebenzisa imisebenzi eminingi ngokulandelana sisebenzisa isisebenzisi pipe (|)
. Ngokuvamile, ukuhamba komsebenzi kubukeka njengesithombe esingezansi.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Kukhodi yethu, sizodala imisebenzi emibili yangokwezifiso. Umsebenzi regex_clean
, eskena idatha futhi ibuyise umugqa ohambisanayo ngokusekelwe kuhlu lwe-PATTERNS isebenzisa umsebenzi re.search
. Umsebenzi ubuyisela iyunithi yezinhlamvu ehlukaniswe ngokhefana. Uma ungeyena uchwepheshe wezinkulumo ezijwayelekile, ngincoma ukuthi uhlole lokhu datetime
ngaphakathi komsebenzi ukuze awenze asebenze. Bengithola iphutha lokungenisa ekuqaleni kwefayela, okwakuyinqaba. Lolu hlu lube seludluliselwa kuhlelo BhalaToBigQuery, okuvele kwengeze idatha yethu etafuleni. Ikhodi ye-Batch DataFlow Job kanye ne-Streaming DataFlow Job inikezwe ngezansi. Umehluko kuphela phakathi kweqoqo nekhodi yokusakaza ukuthi ku-batch sifunda i-CSV kuyo src_path
usebenzisa umsebenzi ReadFromText
kusuka ku-Beam.
Umsebenzi we-Batch DataFlow (ukucubungula iqoqo)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Ukusakazwa kwe-DataFlow Job (ukucutshungulwa kokusakaza)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Iqala isithumeli
Singasebenzisa iphayiphi ngezindlela eziningana ezahlukene. Uma besifuna, besingavele siyiqhube endaweni sisuka kutheminali ngenkathi singena ku-GCP ukude.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Kodwa-ke, sizoyiqhuba sisebenzisa i-DataFlow. Singakwenza lokhu sisebenzisa umyalo ongezansi ngokusetha imingcele edingekayo elandelayo.
project
- I-ID yephrojekthi yakho ye-GCP.runner
ungumgijimi wepayipi ozohlaziya uhlelo lwakho futhi akhe ipayipi lakho. Ukuze uqalise emafini, kufanele ucacise i-DataflowRunner.staging_location
β indlela eya kusitoreji samafu se-Cloud Dataflow ukuze uthole amaphakheji ekhodi ezinkomba adingwa abacubungula abenza umsebenzi.temp_location
β indlela eya kusitoreji samafu se-Cloud Dataflow ukuze kugcinwe amafayela emisebenzi yesikhashana adalwe ngenkathi ipayipi lisebenza.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Ngenkathi lo myalo usebenza, singaya kuthebhu ye-DataFlow kukhonsoli ye-google futhi sibuke ipayipi lethu. Uma sichofoza ipayipi, kufanele sibone into efanayo noMfanekiso 4. Ngezinjongo zokususa iphutha, kungasiza kakhulu ukuya ku-Logs bese uye ku-Stackdriver ukuze ubuke amalogi anemininingwane. Lokhu kungisizile ukuxazulula izinkinga zamapayipi ezimweni eziningi.
Umfanekiso 4: I-beam conveyor
Finyelela idatha yethu ku-BigQuery
Ngakho-ke, kufanele sesivele sinepayipi elisebenzayo elinedatha egeleza etafuleni lethu. Ukuhlola lokhu, singaya ku-BigQuery futhi sibheke idatha. Ngemva kokusebenzisa umyalo ongezansi kufanele ubone imigqa embalwa yokuqala yedathasethi. Manje njengoba sesinedatha egcinwe ku-BigQuery, singakwazi ukwenza ukuhlaziya okwengeziwe, futhi sabelane ngedatha nozakwethu futhi siqale ukuphendula imibuzo yebhizinisi.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Umfanekiso 5: I-BigQuery
isiphetho
Sithemba ukuthi lokhu okuthunyelwe kusebenza njengesibonelo esiwusizo sokudala ipayipi ledatha yokusakaza, kanye nokuthola izindlela zokwenza idatha ifinyeleleke kakhudlwana. Ukugcina idatha ngale fomethi kusinika izinzuzo eziningi. Manje sesingaqala ukuphendula imibuzo ebalulekile efana nokuthi bangaki abantu abasebenzisa umkhiqizo wethu? Ingabe isisekelo sakho sabasebenzisi siyakhula ngokuhamba kwesikhathi? Yiziphi izici zomkhiqizo abantu abasebenzisana nazo kakhulu? Futhi ingabe akhona amaphutha lapho kungafanele abe khona? Lena imibuzo ezoba nentshisekelo enhlanganweni. Ngokusekelwe emininingwaneni evela ezimpendulweni zale mibuzo, singathuthukisa umkhiqizo futhi sikhulise ukusebenzelana komsebenzisi.
I-Beam iwusizo ngempela kulolu hlobo lokuzivivinya futhi inenombolo yamanye amacala okusebenzisa athokozisayo. Isibonelo, ungase ufune ukuhlaziya idatha yomaka wamasheya ngesikhathi sangempela futhi wenze uhwebo ngokusekelwe ekuhlaziyeni, mhlawumbe unedatha yenzwa evela ezimotweni futhi ufuna ukubala izibalo zezinga lethrafikhi. Ungase futhi, isibonelo, ube inkampani yokudlala eqoqa idatha yomsebenzisi futhi iyisebenzisele ukudala amadeshibhodi ukuze ulandelele amamethrikhi angukhiye. Kulungile, madoda, lesi yisihloko sokunye okuthunyelwe, ngiyabonga ngokufunda, futhi kulabo abafuna ukubona ikhodi egcwele, ngezansi isixhumanisi se-GitHub yami.
Yilokho kuphela.
Source: www.habr.com