Moni nonse. Tikugawana kumasulira kwa gawo lomaliza la nkhaniyi, lokonzedwa makamaka kwa ophunzira a maphunzirowo.
Apache Beam ndi DataFlow for Real-Time Pipelines
Kukhazikitsa Google Cloud
Zindikirani: Ndinagwiritsa ntchito Google Cloud Shell poyendetsa mapaipi ndikufalitsa deta ya zolemba zanu chifukwa ndinali ndi vuto loyendetsa payipi mu Python 3. Google Cloud Shell imagwiritsa ntchito Python 2, yomwe imagwirizana kwambiri ndi Apache Beam.
Kuti tiyambe payipi, tifunika kukumba pang'ono pazokonda. Kwa inu omwe simunagwiritsepo ntchito GCP m'mbuyomu, muyenera kutsatira njira 6 zotsatirazi zomwe zafotokozedwa mu izi
Pambuyo pake, tidzafunika kukweza zolemba zathu ku Google Cloud Storage ndikuzikopera ku Google Cloud Shel. Kukwezera kumalo osungira mitambo ndikosavuta (kufotokozera kungapezeke
Chithunzi 2
Malamulo omwe timafunikira kukopera mafayilo ndikuyika malaibulale ofunikira alembedwa pansipa.
# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>
Kupanga database yathu ndi tebulo
Tikamaliza masitepe onse okhudzana ndi kukhazikitsidwa, chinthu chotsatira chomwe tikuyenera kuchita ndikupanga dawunilodi ndi tebulo mu BigQuery. Pali njira zingapo zochitira izi, koma chophweka ndikugwiritsa ntchito Google Cloud console popanga kaye deta. Mukhoza kutsatira ndondomeko pansipa
Chithunzi 3. Mapangidwe a tebulo
Kusindikiza deta ya zolemba
Pub/Sub ndi gawo lofunikira kwambiri pamapaipi athu chifukwa imalola mapulogalamu angapo odziyimira pawokha kuti azilumikizana. Makamaka, imagwira ntchito ngati mkhalapakati yomwe imatilola kutumiza ndi kulandira mauthenga pakati pa mapulogalamu. Chinthu choyamba chimene tiyenera kuchita ndi kupanga mutu. Ingopitani ku Pub/Sub mu kontena ndikudina CREATE TOPIC.
Khodi yomwe ili pansipa imayitanitsa script yathu kuti ipange zolemba zomwe zafotokozedwa pamwambapa ndikulumikiza ndikutumiza zipikazo ku Pub/Sub. Chinthu chokha chimene tiyenera kuchita ndi kupanga chinthu PublisherClient, tchulani njira yopita kumutu pogwiritsa ntchito njira topic_path
ndi kuitana ntchito publish
Ρ topic_path
ndi data. Chonde dziwani kuti timatumiza kunja generate_log_line
kuchokera pa script yathu stream_logs
, kotero onetsetsani kuti mafayilowa ali mufoda yomweyi, apo ayi mupeza cholakwika cholowetsa. Titha kuyendetsa izi kudzera mu google console yathu pogwiritsa ntchito:
python publish.py
from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time
PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)
def publish(publisher, topic, message):
data = message.encode('utf-8')
return publisher.publish(topic_path, data = data)
def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
print('Publishing message on {} threw an Exception {}.'.format(
topic_name, message_future.exception()))
else:
print(message_future.result())
if __name__ == '__main__':
while True:
line = generate_log_line()
print(line)
message_future = publish(publisher, topic_path, line)
message_future.add_done_callback(callback)
sleep_time = random.choice(range(1, 3, 1))
time.sleep(sleep_time)
Fayiloyo ikangothamanga, tidzatha kuona zotsatira za deta ya log ku console, monga momwe tawonetsera pa chithunzi pansipa. Script iyi idzagwira ntchito bola ngati sitigwiritsa ntchito CTRL + Ckuti amalize.
Chithunzi 4. Zotsatira publish_logs.py
Kulemba khodi yathu yamapaipi
Tsopano popeza takonzekera zonse, titha kuyambitsa gawo losangalatsa - kukopera mapaipi athu pogwiritsa ntchito Beam ndi Python. Kuti tipange payipi ya Beam, tifunika kupanga chinthu chapaipi (p). Tikapanga chinthu cha mapaipi, titha kugwiritsa ntchito ntchito zingapo imodzi ndi ina pogwiritsa ntchito woyendetsa pipe (|)
. Kawirikawiri, kayendetsedwe ka ntchito kakuwoneka ngati chithunzi pansipa.
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
Mu code yathu, tidzapanga ntchito ziwiri zachizolowezi. Ntchito regex_clean
, yomwe imayang'ana deta ndikupeza mzere wogwirizana ndi mndandanda wa PATTERNS pogwiritsa ntchito ntchitoyi re.search
. Ntchitoyi imabweretsanso chingwe cholekanitsidwa ndi koma. Ngati simuli katswiri wamawu wokhazikika, ndikupangira kuti muwone izi datetime
mkati mwa ntchito kuti igwire ntchito. Ndinali kupeza cholakwika cholowetsa kumayambiriro kwa fayilo, zomwe zinali zodabwitsa. Mndandandawu umaperekedwa ku ntchitoyo LembaniToBigQuery, zomwe zimangowonjezera deta yathu patebulo. Khodi ya Batch DataFlow Job ndi Streaming DataFlow Job yaperekedwa pansipa. Kusiyana kokha pakati pa batch ndi code yotsatsira ndikuti mu batch timawerenga CSV kuchokera src_path
pogwiritsa ntchito ReadFromText
kuchokera ku Beam.
Ntchito ya Batch DataFlow (Batch processing)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys
PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
src_path = "user_log_fileC.txt"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'status': element[3],
'body_bytes_sent': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main():
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| "clean address" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Kusakatula kwa DataFlow Job (kukonza mtsinje)
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"
def regex_clean(data):
PATTERNS = [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
result = []
for match in PATTERNS:
try:
reg_match = re.search(match, data).group()
if reg_match:
result.append(reg_match)
else:
result.append(" ")
except:
print("There was an error with the regex search")
result = [x.strip() for x in result]
result = [x.replace('"', "") for x in result]
res = ','.join(result)
return res
class Split(beam.DoFn):
def process(self, element):
from datetime import datetime
element = element.split(",")
d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
date_string = d.strftime("%Y-%m-%d %H:%M:%S")
return [{
'remote_addr': element[0],
'timelocal': date_string,
'request_type': element[2],
'body_bytes_sent': element[3],
'status': element[4],
'http_referer': element[5],
'http_user_agent': element[6]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| "Clean Data" >> beam.Map(regex_clean)
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logger = logging.getLogger().setLevel(logging.INFO)
main()
Kuyamba kwa conveyor
Titha kuyendetsa mapaipi m'njira zingapo zosiyanasiyana. Ngati tikanafuna, titha kungoyendetsa kwanuko kuchokera ku terminal ndikulowa mu GCP patali.
python -m main_pipeline_stream.py
--input_topic "projects/user-logs-237110/topics/userlogs"
--streaming
Komabe, tidzayendetsa pogwiritsa ntchito DataFlow. Titha kuchita izi pogwiritsa ntchito lamulo ili pansipa pokhazikitsa magawo ofunikira awa.
project
- ID ya polojekiti yanu ya GCP.runner
ndi woyendetsa mapaipi omwe amasanthula pulogalamu yanu ndikupanga mapaipi anu. Kuti muyendetse mumtambo, muyenera kufotokozera DataflowRunner.staging_location
- njira yopita ku Cloud Dataflow kusungirako mtambo kwa indexing code packages zofunika ndi mapurosesa omwe akugwira ntchitoyi.temp_location
- njira yopita ku Cloud Dataflow mtambo yosungirako mafayilo akanthawi omwe amapangidwa pomwe payipi ikugwira ntchito.streaming
python main_pipeline_stream.py
--runner DataFlow
--project $PROJECT
--temp_location $BUCKET/tmp
--staging_location $BUCKET/staging
--streaming
Pomwe lamuloli likuyenda, titha kupita ku tabu ya DataFlow mu google console ndikuwona mapaipi athu. Tikadina papaipi, tiyenera kuwona chofanana ndi Chithunzi 4. Pofuna kuthetsa zolakwika, zingakhale zothandiza kwambiri kupita ku Logs ndiyeno ku Stackdriver kuti muwone zolemba zambiri. Izi zandithandiza kuthetsa nkhani za mapaipi muzochitika zingapo.
Chithunzi 4: Woyendetsa mtengo
Pezani zambiri zathu mu BigQuery
Chifukwa chake, tiyenera kukhala ndi mapaipi omwe akuyenda ndi data yomwe ikuyenda patebulo lathu. Kuti tiyese izi, titha kupita ku BigQuery ndikuyang'ana deta. Mukamaliza kugwiritsa ntchito lamulo ili pansipa muyenera kuwona mizere ingapo yoyamba ya dataset. Tsopano popeza tili ndi data yomwe yasungidwa mu BigQuery, titha kusanthulanso kwina, komanso kugawana zambiri ndi anzathu ndikuyamba kuyankha mafunso abizinesi.
SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;
Chithunzi 5: BigQuery
Pomaliza
Tikukhulupirira kuti positiyi ndi chitsanzo chothandiza popanga payipi ya data yotsatsira, komanso kupeza njira zopangira kuti deta ipezeke. Kusunga deta mu mtundu uwu kumatipatsa ubwino wambiri. Tsopano titha kuyamba kuyankha mafunso ofunika monga kuti ndi anthu angati omwe amagwiritsa ntchito mankhwala athu? Kodi ogwiritsa ntchito anu akukula pakapita nthawi? Ndi zinthu ziti zomwe anthu amakumana nazo kwambiri? Ndipo pali zolakwika pomwe siziyenera kukhala? Awa ndi mafunso omwe angakhale osangalatsa ku bungwe. Kutengera zidziwitso zomwe zimachokera ku mayankho a mafunsowa, titha kukonza malonda ndikuwonjezera chidwi cha ogwiritsa ntchito.
Beam ndiyothandiza kwambiri pazolimbitsa thupi zamtunduwu ndipo ilinso ndi zochitika zina zingapo zosangalatsa. Mwachitsanzo, mungafune kusanthula deta ya tick tick mu nthawi yeniyeni ndikupanga malonda kutengera kusanthula, mwina muli ndi chidziwitso cha sensor chomwe chimachokera kumagalimoto ndipo mukufuna kuwerengera kuchuluka kwa magalimoto. Mukhozanso, mwachitsanzo, kukhala kampani yamasewera yomwe imasonkhanitsa deta ya ogwiritsa ntchito ndikuigwiritsa ntchito kupanga ma dashboards kuti azitsatira ma metrics ofunika. Chabwino, abambo, uwu ndi mutu wa positi ina, zikomo powerenga, komanso kwa iwo omwe akufuna kuwona code yonse, pansipa pali ulalo wa GitHub wanga.
Ndizo zonse.
Source: www.habr.com